RRateLimiter added. #75
parent
daac20fcd2
commit
733f64d5af
@ -0,0 +1,231 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2018 Nikita Koksharov
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.redisson;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.redisson.api.RFuture;
|
||||||
|
import org.redisson.api.RRateLimiter;
|
||||||
|
import org.redisson.api.RateIntervalUnit;
|
||||||
|
import org.redisson.api.RateType;
|
||||||
|
import org.redisson.client.codec.LongCodec;
|
||||||
|
import org.redisson.client.protocol.RedisCommand;
|
||||||
|
import org.redisson.client.protocol.RedisCommands;
|
||||||
|
import org.redisson.command.CommandAsyncExecutor;
|
||||||
|
import org.redisson.misc.RPromise;
|
||||||
|
import org.redisson.misc.RedissonPromise;
|
||||||
|
|
||||||
|
import io.netty.util.concurrent.Future;
|
||||||
|
import io.netty.util.concurrent.FutureListener;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author Nikita Koksharov
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class RedissonRateLimiter extends RedissonObject implements RRateLimiter {
|
||||||
|
|
||||||
|
public RedissonRateLimiter(CommandAsyncExecutor commandExecutor, String name) {
|
||||||
|
super(commandExecutor, name);
|
||||||
|
}
|
||||||
|
|
||||||
|
String getValueName() {
|
||||||
|
return suffixName(getName(), "value");
|
||||||
|
}
|
||||||
|
|
||||||
|
String getClientValueName() {
|
||||||
|
return suffixName(getValueName(), commandExecutor.getConnectionManager().getId().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAcquire() {
|
||||||
|
return tryAcquire(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> tryAcquireAsync() {
|
||||||
|
return tryAcquireAsync(1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAcquire(long permits) {
|
||||||
|
return get(tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> tryAcquireAsync(long permits) {
|
||||||
|
return tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acquire() {
|
||||||
|
get(acquireAsync());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RFuture<Void> acquireAsync() {
|
||||||
|
return acquireAsync(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acquire(long permits) {
|
||||||
|
get(acquireAsync(permits));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RFuture<Void> acquireAsync(long permits) {
|
||||||
|
final RPromise<Void> promise = new RedissonPromise<Void>();
|
||||||
|
tryAcquireAsync(permits, -1, null).addListener(new FutureListener<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<Boolean> future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
promise.tryFailure(future.cause());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
promise.trySuccess(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAcquire(long timeout, TimeUnit unit) {
|
||||||
|
return get(tryAcquireAsync(timeout, unit));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> tryAcquireAsync(long timeout, TimeUnit unit) {
|
||||||
|
return tryAcquireAsync(1, timeout, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAcquire(long permits, long timeout, TimeUnit unit) {
|
||||||
|
return get(tryAcquireAsync(permits, timeout, unit));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {
|
||||||
|
RPromise<Boolean> promise = new RedissonPromise<Boolean>();
|
||||||
|
tryAcquireAsync(permits, promise, unit.toMillis(timeout));
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tryAcquireAsync(final long permits, final RPromise<Boolean> promise, final long timeoutInMillis) {
|
||||||
|
final long start = System.currentTimeMillis();
|
||||||
|
RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
|
||||||
|
future.addListener(new FutureListener<Long>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<Long> future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
promise.tryFailure(future.cause());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Long delay = future.getNow();
|
||||||
|
if (delay == null) {
|
||||||
|
promise.trySuccess(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timeoutInMillis == -1) {
|
||||||
|
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
tryAcquireAsync(permits, promise, timeoutInMillis);
|
||||||
|
}
|
||||||
|
}, delay, TimeUnit.SECONDS);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
final long remains = timeoutInMillis - elapsed;
|
||||||
|
if (remains <= 0) {
|
||||||
|
promise.trySuccess(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (remains < delay) {
|
||||||
|
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
promise.trySuccess(false);
|
||||||
|
}
|
||||||
|
}, remains, TimeUnit.SECONDS);
|
||||||
|
} else {
|
||||||
|
final long start = System.currentTimeMillis();
|
||||||
|
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
long elapsed = System.currentTimeMillis() - start;
|
||||||
|
if (remains <= elapsed) {
|
||||||
|
promise.trySuccess(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tryAcquireAsync(permits, promise, remains - elapsed);
|
||||||
|
}
|
||||||
|
}, delay, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
|
||||||
|
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
|
||||||
|
"local rate = redis.call('hget', KEYS[1], 'rate');"
|
||||||
|
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
|
||||||
|
+ "local type = redis.call('hget', KEYS[1], 'type');"
|
||||||
|
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
|
||||||
|
|
||||||
|
+ "local valueName = KEYS[2];"
|
||||||
|
+ "if type == 1 then "
|
||||||
|
+ "valueName = KEYS[3];"
|
||||||
|
+ "end;"
|
||||||
|
|
||||||
|
+ "local currentValue = redis.call('get', valueName); "
|
||||||
|
+ "if currentValue ~= false then "
|
||||||
|
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "
|
||||||
|
+ "return redis.call('pttl', valueName); "
|
||||||
|
+ "else "
|
||||||
|
+ "redis.call('decrby', valueName, ARGV[1]); "
|
||||||
|
+ "return nil; "
|
||||||
|
+ "end; "
|
||||||
|
+ "else "
|
||||||
|
+ "redis.call('set', valueName, rate, 'px', interval); "
|
||||||
|
+ "redis.call('decrby', valueName, ARGV[1]); "
|
||||||
|
+ "return nil; "
|
||||||
|
+ "end;",
|
||||||
|
Arrays.<Object>asList(getName(), getValueName(), getClientValueName()),
|
||||||
|
value, commandExecutor.getConnectionManager().getId().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
|
||||||
|
return get(trySetRateAsync(type, rate, rateInterval, unit));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
|
||||||
|
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
||||||
|
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
|
||||||
|
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
|
||||||
|
+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
|
||||||
|
Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,142 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2018 Nikita Koksharov
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.redisson.api;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author Nikita Koksharov
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface RRateLimiter extends RRateLimiterAsync, RObject {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes RateLimiter's state and stores config to Redis server.
|
||||||
|
*
|
||||||
|
* @param mode - rate mode
|
||||||
|
* @param rate - rate
|
||||||
|
* @param rateInterval - rate time interval
|
||||||
|
* @param rateIntervalUnit - rate time interval unit
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a permit only if one is available at the
|
||||||
|
* time of invocation.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If no permit is available then this method will return
|
||||||
|
* immediately with the value {@code false}.
|
||||||
|
*
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* otherwise
|
||||||
|
*/
|
||||||
|
boolean tryAcquire();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires the given number of <code>permits</code> only if all are available at the
|
||||||
|
* time of invocation.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permits, if all are available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by given number of permits.
|
||||||
|
*
|
||||||
|
* <p>If no permits are available then this method will return
|
||||||
|
* immediately with the value {@code false}.
|
||||||
|
*
|
||||||
|
* @param permits the number of permits to acquire
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* otherwise
|
||||||
|
*/
|
||||||
|
boolean tryAcquire(long permits);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a permit from this RateLimiter, blocking until one is available.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
void acquire();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a specified <code>permits</code> from this RateLimiter,
|
||||||
|
* blocking until one is available.
|
||||||
|
*
|
||||||
|
* <p>Acquires the given number of permits, if they are available
|
||||||
|
* and returns immediately, reducing the number of available permits
|
||||||
|
* by the given amount.
|
||||||
|
*
|
||||||
|
* @param permits
|
||||||
|
*/
|
||||||
|
void acquire(long permits);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a permit from this RateLimiter, if one becomes available
|
||||||
|
* within the given waiting time.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If no permit is available then the current thread becomes
|
||||||
|
* disabled for thread scheduling purposes and lies dormant until
|
||||||
|
* specified waiting time elapses.
|
||||||
|
*
|
||||||
|
* <p>If a permit is acquired then the value {@code true} is returned.
|
||||||
|
*
|
||||||
|
* <p>If the specified waiting time elapses then the value {@code false}
|
||||||
|
* is returned. If the time is less than or equal to zero, the method
|
||||||
|
* will not wait at all.
|
||||||
|
*
|
||||||
|
* @param timeout the maximum time to wait for a permit
|
||||||
|
* @param unit the time unit of the {@code timeout} argument
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* if the waiting time elapsed before a permit was acquired
|
||||||
|
*/
|
||||||
|
boolean tryAcquire(long timeout, TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires the given number of <code>permits</code> only if all are available
|
||||||
|
* within the given waiting time.
|
||||||
|
*
|
||||||
|
* <p>Acquires the given number of permits, if all are available and returns immediately,
|
||||||
|
* with the value {@code true}, reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If no permit is available then the current thread becomes
|
||||||
|
* disabled for thread scheduling purposes and lies dormant until
|
||||||
|
* the specified waiting time elapses.
|
||||||
|
*
|
||||||
|
* <p>If a permits is acquired then the value {@code true} is returned.
|
||||||
|
*
|
||||||
|
* <p>If the specified waiting time elapses then the value {@code false}
|
||||||
|
* is returned. If the time is less than or equal to zero, the method
|
||||||
|
* will not wait at all.
|
||||||
|
*
|
||||||
|
* @param permits amount
|
||||||
|
* @param timeout the maximum time to wait for a permit
|
||||||
|
* @param unit the time unit of the {@code timeout} argument
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* if the waiting time elapsed before a permit was acquired
|
||||||
|
*/
|
||||||
|
boolean tryAcquire(long permits, long timeout, TimeUnit unit);
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,142 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2018 Nikita Koksharov
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.redisson.api;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author Nikita Koksharov
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface RRateLimiterAsync extends RObjectAsync {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes RateLimiter's state and stores config to Redis server.
|
||||||
|
*
|
||||||
|
* @param mode - rate mode
|
||||||
|
* @param rate - rate
|
||||||
|
* @param rateInterval - rate time interval
|
||||||
|
* @param rateIntervalUnit - rate time interval unit
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
RFuture<Boolean> trySetRateAsync(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a permit only if one is available at the
|
||||||
|
* time of invocation.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If no permit is available then this method will return
|
||||||
|
* immediately with the value {@code false}.
|
||||||
|
*
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* otherwise
|
||||||
|
*/
|
||||||
|
RFuture<Boolean> tryAcquireAsync();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires the given number of <code>permits</code> only if all are available at the
|
||||||
|
* time of invocation.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permits, if all are available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by given number of permits.
|
||||||
|
*
|
||||||
|
* <p>If no permits are available then this method will return
|
||||||
|
* immediately with the value {@code false}.
|
||||||
|
*
|
||||||
|
* @param permits the number of permits to acquire
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* otherwise
|
||||||
|
*/
|
||||||
|
RFuture<Boolean> tryAcquireAsync(long permits);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a permit from this RateLimiter, blocking until one is available.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
RFuture<Void> acquireAsync();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a specified <code>permits</code> from this RateLimiter,
|
||||||
|
* blocking until one is available.
|
||||||
|
*
|
||||||
|
* <p>Acquires the given number of permits, if they are available
|
||||||
|
* and returns immediately, reducing the number of available permits
|
||||||
|
* by the given amount.
|
||||||
|
*
|
||||||
|
* @param permits
|
||||||
|
*/
|
||||||
|
RFuture<Void> acquireAsync(long permits);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a permit from this RateLimiter, if one becomes available
|
||||||
|
* within the given waiting time.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If no permit is available then the current thread becomes
|
||||||
|
* disabled for thread scheduling purposes and lies dormant until
|
||||||
|
* specified waiting time elapses.
|
||||||
|
*
|
||||||
|
* <p>If a permit is acquired then the value {@code true} is returned.
|
||||||
|
*
|
||||||
|
* <p>If the specified waiting time elapses then the value {@code false}
|
||||||
|
* is returned. If the time is less than or equal to zero, the method
|
||||||
|
* will not wait at all.
|
||||||
|
*
|
||||||
|
* @param timeout the maximum time to wait for a permit
|
||||||
|
* @param unit the time unit of the {@code timeout} argument
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* if the waiting time elapsed before a permit was acquired
|
||||||
|
*/
|
||||||
|
RFuture<Boolean> tryAcquireAsync(long timeout, TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires the given number of <code>permits</code> only if all are available
|
||||||
|
* within the given waiting time.
|
||||||
|
*
|
||||||
|
* <p>Acquires the given number of permits, if all are available and returns immediately,
|
||||||
|
* with the value {@code true}, reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If no permit is available then the current thread becomes
|
||||||
|
* disabled for thread scheduling purposes and lies dormant until
|
||||||
|
* the specified waiting time elapses.
|
||||||
|
*
|
||||||
|
* <p>If a permits is acquired then the value {@code true} is returned.
|
||||||
|
*
|
||||||
|
* <p>If the specified waiting time elapses then the value {@code false}
|
||||||
|
* is returned. If the time is less than or equal to zero, the method
|
||||||
|
* will not wait at all.
|
||||||
|
*
|
||||||
|
* @param permits amount
|
||||||
|
* @param timeout the maximum time to wait for a permit
|
||||||
|
* @param unit the time unit of the {@code timeout} argument
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* if the waiting time elapsed before a permit was acquired
|
||||||
|
*/
|
||||||
|
RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit);
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,56 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2018 Nikita Koksharov
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.redisson.api;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author Nikita Koksharov
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public enum RateIntervalUnit {
|
||||||
|
|
||||||
|
SECONDS {
|
||||||
|
@Override
|
||||||
|
public long toMillis(long value) {
|
||||||
|
return TimeUnit.SECONDS.toMillis(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
MINUTES {
|
||||||
|
@Override
|
||||||
|
public long toMillis(long value) {
|
||||||
|
return TimeUnit.MINUTES.toMillis(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
HOURS {
|
||||||
|
@Override
|
||||||
|
public long toMillis(long value) {
|
||||||
|
return TimeUnit.HOURS.toMillis(value);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
DAYS {
|
||||||
|
@Override
|
||||||
|
public long toMillis(long value) {
|
||||||
|
return TimeUnit.DAYS.toMillis(value);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public abstract long toMillis(long value);
|
||||||
|
}
|
@ -0,0 +1,35 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2018 Nikita Koksharov
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.redisson.api;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author Nikita Koksharov
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public enum RateType {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total rate for all RateLimiter instances
|
||||||
|
*/
|
||||||
|
OVERALL,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total rate for all RateLimiter instances working with the same Redisson instance
|
||||||
|
*/
|
||||||
|
PER_CLIENT
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
package org.redisson;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.redisson.api.RRateLimiter;
|
||||||
|
import org.redisson.api.RateIntervalUnit;
|
||||||
|
import org.redisson.api.RateType;
|
||||||
|
|
||||||
|
public class RedissonRateLimiterTest extends BaseTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws InterruptedException {
|
||||||
|
RRateLimiter rr = redisson.getRateLimiter("test");
|
||||||
|
assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue();
|
||||||
|
assertThat(rr.trySetRate(RateType.OVERALL, 20, 1, RateIntervalUnit.SECONDS)).isFalse();
|
||||||
|
|
||||||
|
for (int j = 0; j < 3; j++) {
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertThat(rr.tryAcquire()).isTrue();
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertThat(rr.tryAcquire()).isFalse();
|
||||||
|
}
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test3() throws InterruptedException {
|
||||||
|
RRateLimiter rr = redisson.getRateLimiter("test");
|
||||||
|
assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue();
|
||||||
|
assertThat(rr.trySetRate(RateType.OVERALL, 20, 1, RateIntervalUnit.SECONDS)).isFalse();
|
||||||
|
|
||||||
|
Queue<Long> queue = new ConcurrentLinkedQueue<Long>();
|
||||||
|
AtomicLong counter = new AtomicLong();
|
||||||
|
ExecutorService pool = Executors.newFixedThreadPool(8);
|
||||||
|
for (int i = 0; i < 8; i++) {
|
||||||
|
pool.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
if (rr.tryAcquire()) {
|
||||||
|
queue.add(System.currentTimeMillis());
|
||||||
|
if (counter.incrementAndGet() > 500) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.shutdown();
|
||||||
|
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
long start = 0;
|
||||||
|
for (Long value : queue) {
|
||||||
|
if (count % 10 == 0) {
|
||||||
|
if (start > 0) {
|
||||||
|
assertThat(value - start).isGreaterThan(999);
|
||||||
|
}
|
||||||
|
start = value;
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue