diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 1cf745faa..9b1751952 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -60,6 +60,7 @@ import org.redisson.api.RPriorityBlockingQueue; import org.redisson.api.RPriorityDeque; import org.redisson.api.RPriorityQueue; import org.redisson.api.RQueue; +import org.redisson.api.RRateLimiter; import org.redisson.api.RReadWriteLock; import org.redisson.api.RRemoteService; import org.redisson.api.RScheduledExecutorService; @@ -209,6 +210,11 @@ public class Redisson implements RedissonClient { return new RedissonBucket(connectionManager.getCommandExecutor(), name); } + @Override + public RRateLimiter getRateLimiter(String name) { + return new RedissonRateLimiter(connectionManager.getCommandExecutor(), name); + } + @Override public RBucket getBucket(String name, Codec codec) { return new RedissonBucket(codec, connectionManager.getCommandExecutor(), name); diff --git a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java new file mode 100644 index 000000000..aca3af2e1 --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java @@ -0,0 +1,231 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import org.redisson.api.RFuture; +import org.redisson.api.RRateLimiter; +import org.redisson.api.RateIntervalUnit; +import org.redisson.api.RateType; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonRateLimiter extends RedissonObject implements RRateLimiter { + + public RedissonRateLimiter(CommandAsyncExecutor commandExecutor, String name) { + super(commandExecutor, name); + } + + String getValueName() { + return suffixName(getName(), "value"); + } + + String getClientValueName() { + return suffixName(getValueName(), commandExecutor.getConnectionManager().getId().toString()); + } + + @Override + public boolean tryAcquire() { + return tryAcquire(1); + } + + @Override + public RFuture tryAcquireAsync() { + return tryAcquireAsync(1L); + } + + @Override + public boolean tryAcquire(long permits) { + return get(tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits)); + } + + @Override + public RFuture tryAcquireAsync(long permits) { + return tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits); + } + + @Override + public void acquire() { + get(acquireAsync()); + } + + @Override + public RFuture acquireAsync() { + return acquireAsync(1); + } + + @Override + public void acquire(long permits) { + get(acquireAsync(permits)); + } + + @Override + public RFuture acquireAsync(long permits) { + final RPromise promise = new RedissonPromise(); + tryAcquireAsync(permits, -1, null).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(null); + } + }); + return promise; + } + + @Override + public boolean tryAcquire(long timeout, TimeUnit unit) { + return get(tryAcquireAsync(timeout, unit)); + } + + @Override + public RFuture tryAcquireAsync(long timeout, TimeUnit unit) { + return tryAcquireAsync(1, timeout, unit); + } + + @Override + public boolean tryAcquire(long permits, long timeout, TimeUnit unit) { + return get(tryAcquireAsync(permits, timeout, unit)); + } + + @Override + public RFuture tryAcquireAsync(long permits, long timeout, TimeUnit unit) { + RPromise promise = new RedissonPromise(); + tryAcquireAsync(permits, promise, unit.toMillis(timeout)); + return promise; + } + + private void tryAcquireAsync(final long permits, final RPromise promise, final long timeoutInMillis) { + final long start = System.currentTimeMillis(); + RFuture future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + Long delay = future.getNow(); + if (delay == null) { + promise.trySuccess(true); + return; + } + + if (timeoutInMillis == -1) { + commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { + @Override + public void run() { + tryAcquireAsync(permits, promise, timeoutInMillis); + } + }, delay, TimeUnit.SECONDS); + return; + } + + long elapsed = System.currentTimeMillis() - start; + final long remains = timeoutInMillis - elapsed; + if (remains <= 0) { + promise.trySuccess(false); + return; + } + if (remains < delay) { + commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { + @Override + public void run() { + promise.trySuccess(false); + } + }, remains, TimeUnit.SECONDS); + } else { + final long start = System.currentTimeMillis(); + commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { + @Override + public void run() { + long elapsed = System.currentTimeMillis() - start; + if (remains <= elapsed) { + promise.trySuccess(false); + return; + } + + tryAcquireAsync(permits, promise, remains - elapsed); + } + }, delay, TimeUnit.SECONDS); + } + } + }); + } + + private RFuture tryAcquireAsync(RedisCommand command, Long value) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, + "local rate = redis.call('hget', KEYS[1], 'rate');" + + "local interval = redis.call('hget', KEYS[1], 'interval');" + + "local type = redis.call('hget', KEYS[1], 'type');" + + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" + + + "local valueName = KEYS[2];" + + "if type == 1 then " + + "valueName = KEYS[3];" + + "end;" + + + "local currentValue = redis.call('get', valueName); " + + "if currentValue ~= false then " + + "if tonumber(currentValue) < tonumber(ARGV[1]) then " + + "return redis.call('pttl', valueName); " + + "else " + + "redis.call('decrby', valueName, ARGV[1]); " + + "return nil; " + + "end; " + + "else " + + "redis.call('set', valueName, rate, 'px', interval); " + + "redis.call('decrby', valueName, ARGV[1]); " + + "return nil; " + + "end;", + Arrays.asList(getName(), getValueName(), getClientValueName()), + value, commandExecutor.getConnectionManager().getId().toString()); + } + + @Override + public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) { + return get(trySetRateAsync(type, rate, rateInterval, unit)); + } + + @Override + public RFuture trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);" + + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);" + + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);", + Collections.singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal()); + } + +} diff --git a/redisson/src/main/java/org/redisson/api/RRateLimiter.java b/redisson/src/main/java/org/redisson/api/RRateLimiter.java new file mode 100644 index 000000000..8471be4fb --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RRateLimiter.java @@ -0,0 +1,142 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RRateLimiter extends RRateLimiterAsync, RObject { + + /** + * Initializes RateLimiter's state and stores config to Redis server. + * + * @param mode - rate mode + * @param rate - rate + * @param rateInterval - rate time interval + * @param rateIntervalUnit - rate time interval unit + * @return + */ + boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit); + + /** + * Acquires a permit only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + boolean tryAcquire(); + + /** + * Acquires the given number of permits only if all are available at the + * time of invocation. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by given number of permits. + * + *

If no permits are available then this method will return + * immediately with the value {@code false}. + * + * @param permits the number of permits to acquire + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + boolean tryAcquire(long permits); + + /** + * Acquires a permit from this RateLimiter, blocking until one is available. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + */ + void acquire(); + + /** + * Acquires a specified permits from this RateLimiter, + * blocking until one is available. + * + *

Acquires the given number of permits, if they are available + * and returns immediately, reducing the number of available permits + * by the given amount. + * + * @param permits + */ + void acquire(long permits); + + /** + * Acquires a permit from this RateLimiter, if one becomes available + * within the given waiting time. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * specified waiting time elapses. + * + *

If a permit is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + boolean tryAcquire(long timeout, TimeUnit unit); + + /** + * Acquires the given number of permits only if all are available + * within the given waiting time. + * + *

Acquires the given number of permits, if all are available and returns immediately, + * with the value {@code true}, reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * the specified waiting time elapses. + * + *

If a permits is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param permits amount + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + boolean tryAcquire(long permits, long timeout, TimeUnit unit); + +} diff --git a/redisson/src/main/java/org/redisson/api/RRateLimiterAsync.java b/redisson/src/main/java/org/redisson/api/RRateLimiterAsync.java new file mode 100644 index 000000000..06391a03e --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RRateLimiterAsync.java @@ -0,0 +1,142 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RRateLimiterAsync extends RObjectAsync { + + /** + * Initializes RateLimiter's state and stores config to Redis server. + * + * @param mode - rate mode + * @param rate - rate + * @param rateInterval - rate time interval + * @param rateIntervalUnit - rate time interval unit + * @return + */ + RFuture trySetRateAsync(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit); + + /** + * Acquires a permit only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + RFuture tryAcquireAsync(); + + /** + * Acquires the given number of permits only if all are available at the + * time of invocation. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by given number of permits. + * + *

If no permits are available then this method will return + * immediately with the value {@code false}. + * + * @param permits the number of permits to acquire + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + RFuture tryAcquireAsync(long permits); + + /** + * Acquires a permit from this RateLimiter, blocking until one is available. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + */ + RFuture acquireAsync(); + + /** + * Acquires a specified permits from this RateLimiter, + * blocking until one is available. + * + *

Acquires the given number of permits, if they are available + * and returns immediately, reducing the number of available permits + * by the given amount. + * + * @param permits + */ + RFuture acquireAsync(long permits); + + /** + * Acquires a permit from this RateLimiter, if one becomes available + * within the given waiting time. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * specified waiting time elapses. + * + *

If a permit is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + RFuture tryAcquireAsync(long timeout, TimeUnit unit); + + /** + * Acquires the given number of permits only if all are available + * within the given waiting time. + * + *

Acquires the given number of permits, if all are available and returns immediately, + * with the value {@code true}, reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * the specified waiting time elapses. + * + *

If a permits is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param permits amount + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + RFuture tryAcquireAsync(long permits, long timeout, TimeUnit unit); + +} diff --git a/redisson/src/main/java/org/redisson/api/RateIntervalUnit.java b/redisson/src/main/java/org/redisson/api/RateIntervalUnit.java new file mode 100644 index 000000000..fab16934d --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RateIntervalUnit.java @@ -0,0 +1,56 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +/** + * + * @author Nikita Koksharov + * + */ +public enum RateIntervalUnit { + + SECONDS { + @Override + public long toMillis(long value) { + return TimeUnit.SECONDS.toMillis(value); + } + }, + + MINUTES { + @Override + public long toMillis(long value) { + return TimeUnit.MINUTES.toMillis(value); + } + }, + + HOURS { + @Override + public long toMillis(long value) { + return TimeUnit.HOURS.toMillis(value); + } + }, + + DAYS { + @Override + public long toMillis(long value) { + return TimeUnit.DAYS.toMillis(value); + } + }; + + public abstract long toMillis(long value); +} diff --git a/redisson/src/main/java/org/redisson/api/RateType.java b/redisson/src/main/java/org/redisson/api/RateType.java new file mode 100644 index 000000000..9609d9e57 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RateType.java @@ -0,0 +1,35 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +/** + * + * @author Nikita Koksharov + * + */ +public enum RateType { + + /** + * Total rate for all RateLimiter instances + */ + OVERALL, + + /** + * Total rate for all RateLimiter instances working with the same Redisson instance + */ + PER_CLIENT + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index a084c47de..c858888a8 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -29,6 +29,14 @@ import org.redisson.config.Config; */ public interface RedissonClient { + /** + * Returns rate limiter instance by name + * + * @param name of rate limiter + * @return RateLimiter object + */ + RRateLimiter getRateLimiter(String name); + /** * Returns binary stream holder instance by name * diff --git a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java new file mode 100644 index 000000000..15b997fbd --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java @@ -0,0 +1,83 @@ +package org.redisson; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Test; +import org.redisson.api.RRateLimiter; +import org.redisson.api.RateIntervalUnit; +import org.redisson.api.RateType; + +public class RedissonRateLimiterTest extends BaseTest { + + @Test + public void test() throws InterruptedException { + RRateLimiter rr = redisson.getRateLimiter("test"); + assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue(); + assertThat(rr.trySetRate(RateType.OVERALL, 20, 1, RateIntervalUnit.SECONDS)).isFalse(); + + for (int j = 0; j < 3; j++) { + for (int i = 0; i < 10; i++) { + assertThat(rr.tryAcquire()).isTrue(); + } + for (int i = 0; i < 10; i++) { + assertThat(rr.tryAcquire()).isFalse(); + } + Thread.sleep(1000); + } + } + + @Test + public void test3() throws InterruptedException { + RRateLimiter rr = redisson.getRateLimiter("test"); + assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue(); + assertThat(rr.trySetRate(RateType.OVERALL, 20, 1, RateIntervalUnit.SECONDS)).isFalse(); + + Queue queue = new ConcurrentLinkedQueue(); + AtomicLong counter = new AtomicLong(); + ExecutorService pool = Executors.newFixedThreadPool(8); + for (int i = 0; i < 8; i++) { + pool.execute(new Runnable() { + @Override + public void run() { + while (true) { + if (rr.tryAcquire()) { + queue.add(System.currentTimeMillis()); + if (counter.incrementAndGet() > 500) { + break; + } + } + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } + }); + } + + pool.shutdown(); + pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + + int count = 0; + long start = 0; + for (Long value : queue) { + if (count % 10 == 0) { + if (start > 0) { + assertThat(value - start).isGreaterThan(999); + } + start = value; + } + count++; + } + } + +}