diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index a21c3fae3..e0a60598d 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -16,7 +16,6 @@ package org.redisson; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.UUID; @@ -32,7 +31,6 @@ import org.redisson.api.RBitSetReactive; import org.redisson.api.RBlockingQueueReactive; import org.redisson.api.RBucketReactive; import org.redisson.api.RDequeReactive; -import org.redisson.api.RFuture; import org.redisson.api.RHyperLogLogReactive; import org.redisson.api.RKeys; import org.redisson.api.RKeysReactive; @@ -45,6 +43,7 @@ import org.redisson.api.RMapReactive; import org.redisson.api.RPatternTopicReactive; import org.redisson.api.RPermitExpirableSemaphoreReactive; import org.redisson.api.RQueueReactive; +import org.redisson.api.RRateLimiterReactive; import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; @@ -57,7 +56,6 @@ import org.redisson.api.RTransactionReactive; import org.redisson.api.RedissonReactiveClient; import org.redisson.api.TransactionOptions; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.codec.ReferenceCodecProvider; import org.redisson.command.CommandReactiveService; import org.redisson.config.Config; @@ -83,6 +81,7 @@ import org.redisson.reactive.RedissonMapReactive; import org.redisson.reactive.RedissonPatternTopicReactive; import org.redisson.reactive.RedissonPermitExpirableSemaphoreReactive; import org.redisson.reactive.RedissonQueueReactive; +import org.redisson.reactive.RedissonRateLimiterReactive; import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonScriptReactive; @@ -121,6 +120,11 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getReferenceCodecProvider(); } + @Override + public RRateLimiterReactive getRateLimiter(String name) { + return new RedissonRateLimiterReactive(commandExecutor, name); + } + @Override public RSemaphoreReactive getSemaphore(String name) { return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub); diff --git a/redisson/src/main/java/org/redisson/api/RRateLimiterReactive.java b/redisson/src/main/java/org/redisson/api/RRateLimiterReactive.java new file mode 100644 index 000000000..1934ffca1 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RRateLimiterReactive.java @@ -0,0 +1,144 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RRateLimiterReactive extends RObjectReactive { + + /** + * Initializes RateLimiter's state and stores config to Redis server. + * + * @param mode - rate mode + * @param rate - rate + * @param rateInterval - rate time interval + * @param rateIntervalUnit - rate time interval unit + * @return + */ + Publisher trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit); + + /** + * Acquires a permit only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + Publisher tryAcquire(); + + /** + * Acquires the given number of permits only if all are available at the + * time of invocation. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by given number of permits. + * + *

If no permits are available then this method will return + * immediately with the value {@code false}. + * + * @param permits the number of permits to acquire + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + Publisher tryAcquire(long permits); + + /** + * Acquires a permit from this RateLimiter, blocking until one is available. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + */ + Publisher acquire(); + + /** + * Acquires a specified permits from this RateLimiter, + * blocking until one is available. + * + *

Acquires the given number of permits, if they are available + * and returns immediately, reducing the number of available permits + * by the given amount. + * + * @param permits + */ + Publisher acquire(long permits); + + /** + * Acquires a permit from this RateLimiter, if one becomes available + * within the given waiting time. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * specified waiting time elapses. + * + *

If a permit is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(long timeout, TimeUnit unit); + + /** + * Acquires the given number of permits only if all are available + * within the given waiting time. + * + *

Acquires the given number of permits, if all are available and returns immediately, + * with the value {@code true}, reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * the specified waiting time elapses. + * + *

If a permits is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param permits amount + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(long permits, long timeout, TimeUnit unit); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 42803b932..a3862ae51 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -30,6 +30,14 @@ import org.redisson.config.Config; */ public interface RedissonReactiveClient { + /** + * Returns rate limiter instance by name + * + * @param name of rate limiter + * @return RateLimiter object + */ + RRateLimiterReactive getRateLimiter(String name); + /** * Returns semaphore instance by name * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java index 05d96898c..604395f12 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java @@ -34,12 +34,16 @@ import reactor.fn.Supplier; */ abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive { + protected final RExpirableAsync instance; + RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { super(connectionManager, name, instance); + this.instance = instance; } RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { super(codec, connectionManager, name, instance); + this.instance = instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 0e6106052..1da539cd2 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -21,8 +21,8 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonReference; -import org.redisson.api.RExpirableAsync; import org.redisson.api.RFuture; +import org.redisson.api.RObjectAsync; import org.redisson.api.RObjectReactive; import org.redisson.client.codec.Codec; import org.redisson.command.CommandReactiveExecutor; @@ -44,9 +44,9 @@ abstract class RedissonObjectReactive implements RObjectReactive { final CommandReactiveExecutor commandExecutor; private final String name; final Codec codec; - protected RExpirableAsync instance; + protected RObjectAsync instance; - public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) { + public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) { this.codec = codec; this.name = name; this.commandExecutor = commandExecutor; @@ -57,7 +57,7 @@ abstract class RedissonObjectReactive implements RObjectReactive { return commandExecutor.reactive(supplier); } - public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) { + public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java new file mode 100644 index 000000000..63911757a --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java @@ -0,0 +1,120 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonRateLimiter; +import org.redisson.api.RFuture; +import org.redisson.api.RRateLimiterAsync; +import org.redisson.api.RRateLimiterReactive; +import org.redisson.api.RateIntervalUnit; +import org.redisson.api.RateType; +import org.redisson.command.CommandReactiveExecutor; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonRateLimiterReactive extends RedissonObjectReactive implements RRateLimiterReactive { + + private final RRateLimiterAsync instance; + + public RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name) { + this(connectionManager, name, new RedissonRateLimiter(connectionManager, name)); + } + + private RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name, RRateLimiterAsync instance) { + super(connectionManager, name, instance); + this.instance = instance; + } + + @Override + public Publisher trySetRate(final RateType mode, final long rate, final long rateInterval, + final RateIntervalUnit rateIntervalUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.trySetRateAsync(mode, rate, rateInterval, rateIntervalUnit); + } + }); + } + + @Override + public Publisher tryAcquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(); + } + }); + } + + @Override + public Publisher tryAcquire(final long permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(permits); + } + }); + } + + @Override + public Publisher acquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(); + } + }); + } + + @Override + public Publisher acquire(final long permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(permits); + } + }); + } + + @Override + public Publisher tryAcquire(final long timeout, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(timeout, unit); + } + }); + } + + @Override + public Publisher tryAcquire(final long permits, final long timeout, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(permits, timeout, unit); + } + }); + } + +}