RRateLimiterReactive object added. #1553
parent
1e7f132742
commit
13305dfcbc
@ -0,0 +1,144 @@
|
||||
/**
|
||||
* Copyright 2018 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.api;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public interface RRateLimiterReactive extends RObjectReactive {
|
||||
|
||||
/**
|
||||
* Initializes RateLimiter's state and stores config to Redis server.
|
||||
*
|
||||
* @param mode - rate mode
|
||||
* @param rate - rate
|
||||
* @param rateInterval - rate time interval
|
||||
* @param rateIntervalUnit - rate time interval unit
|
||||
* @return
|
||||
*/
|
||||
Publisher<Boolean> trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);
|
||||
|
||||
/**
|
||||
* Acquires a permit only if one is available at the
|
||||
* time of invocation.
|
||||
*
|
||||
* <p>Acquires a permit, if one is available and returns immediately,
|
||||
* with the value {@code true},
|
||||
* reducing the number of available permits by one.
|
||||
*
|
||||
* <p>If no permit is available then this method will return
|
||||
* immediately with the value {@code false}.
|
||||
*
|
||||
* @return {@code true} if a permit was acquired and {@code false}
|
||||
* otherwise
|
||||
*/
|
||||
Publisher<Boolean> tryAcquire();
|
||||
|
||||
/**
|
||||
* Acquires the given number of <code>permits</code> only if all are available at the
|
||||
* time of invocation.
|
||||
*
|
||||
* <p>Acquires a permits, if all are available and returns immediately,
|
||||
* with the value {@code true},
|
||||
* reducing the number of available permits by given number of permits.
|
||||
*
|
||||
* <p>If no permits are available then this method will return
|
||||
* immediately with the value {@code false}.
|
||||
*
|
||||
* @param permits the number of permits to acquire
|
||||
* @return {@code true} if a permit was acquired and {@code false}
|
||||
* otherwise
|
||||
*/
|
||||
Publisher<Boolean> tryAcquire(long permits);
|
||||
|
||||
/**
|
||||
* Acquires a permit from this RateLimiter, blocking until one is available.
|
||||
*
|
||||
* <p>Acquires a permit, if one is available and returns immediately,
|
||||
* reducing the number of available permits by one.
|
||||
*
|
||||
*/
|
||||
Publisher<Void> acquire();
|
||||
|
||||
/**
|
||||
* Acquires a specified <code>permits</code> from this RateLimiter,
|
||||
* blocking until one is available.
|
||||
*
|
||||
* <p>Acquires the given number of permits, if they are available
|
||||
* and returns immediately, reducing the number of available permits
|
||||
* by the given amount.
|
||||
*
|
||||
* @param permits
|
||||
*/
|
||||
Publisher<Void> acquire(long permits);
|
||||
|
||||
/**
|
||||
* Acquires a permit from this RateLimiter, if one becomes available
|
||||
* within the given waiting time.
|
||||
*
|
||||
* <p>Acquires a permit, if one is available and returns immediately,
|
||||
* with the value {@code true},
|
||||
* reducing the number of available permits by one.
|
||||
*
|
||||
* <p>If no permit is available then the current thread becomes
|
||||
* disabled for thread scheduling purposes and lies dormant until
|
||||
* specified waiting time elapses.
|
||||
*
|
||||
* <p>If a permit is acquired then the value {@code true} is returned.
|
||||
*
|
||||
* <p>If the specified waiting time elapses then the value {@code false}
|
||||
* is returned. If the time is less than or equal to zero, the method
|
||||
* will not wait at all.
|
||||
*
|
||||
* @param timeout the maximum time to wait for a permit
|
||||
* @param unit the time unit of the {@code timeout} argument
|
||||
* @return {@code true} if a permit was acquired and {@code false}
|
||||
* if the waiting time elapsed before a permit was acquired
|
||||
*/
|
||||
Publisher<Boolean> tryAcquire(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Acquires the given number of <code>permits</code> only if all are available
|
||||
* within the given waiting time.
|
||||
*
|
||||
* <p>Acquires the given number of permits, if all are available and returns immediately,
|
||||
* with the value {@code true}, reducing the number of available permits by one.
|
||||
*
|
||||
* <p>If no permit is available then the current thread becomes
|
||||
* disabled for thread scheduling purposes and lies dormant until
|
||||
* the specified waiting time elapses.
|
||||
*
|
||||
* <p>If a permits is acquired then the value {@code true} is returned.
|
||||
*
|
||||
* <p>If the specified waiting time elapses then the value {@code false}
|
||||
* is returned. If the time is less than or equal to zero, the method
|
||||
* will not wait at all.
|
||||
*
|
||||
* @param permits amount
|
||||
* @param timeout the maximum time to wait for a permit
|
||||
* @param unit the time unit of the {@code timeout} argument
|
||||
* @return {@code true} if a permit was acquired and {@code false}
|
||||
* if the waiting time elapsed before a permit was acquired
|
||||
*/
|
||||
Publisher<Boolean> tryAcquire(long permits, long timeout, TimeUnit unit);
|
||||
|
||||
}
|
@ -0,0 +1,120 @@
|
||||
/**
|
||||
* Copyright 2018 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.reactive;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.redisson.RedissonRateLimiter;
|
||||
import org.redisson.api.RFuture;
|
||||
import org.redisson.api.RRateLimiterAsync;
|
||||
import org.redisson.api.RRateLimiterReactive;
|
||||
import org.redisson.api.RateIntervalUnit;
|
||||
import org.redisson.api.RateType;
|
||||
import org.redisson.command.CommandReactiveExecutor;
|
||||
|
||||
import reactor.fn.Supplier;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public class RedissonRateLimiterReactive extends RedissonObjectReactive implements RRateLimiterReactive {
|
||||
|
||||
private final RRateLimiterAsync instance;
|
||||
|
||||
public RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name) {
|
||||
this(connectionManager, name, new RedissonRateLimiter(connectionManager, name));
|
||||
}
|
||||
|
||||
private RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name, RRateLimiterAsync instance) {
|
||||
super(connectionManager, name, instance);
|
||||
this.instance = instance;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> trySetRate(final RateType mode, final long rate, final long rateInterval,
|
||||
final RateIntervalUnit rateIntervalUnit) {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.trySetRateAsync(mode, rate, rateInterval, rateIntervalUnit);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryAcquire() {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryAcquireAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryAcquire(final long permits) {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryAcquireAsync(permits);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> acquire() {
|
||||
return reactive(new Supplier<RFuture<Void>>() {
|
||||
@Override
|
||||
public RFuture<Void> get() {
|
||||
return instance.acquireAsync();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> acquire(final long permits) {
|
||||
return reactive(new Supplier<RFuture<Void>>() {
|
||||
@Override
|
||||
public RFuture<Void> get() {
|
||||
return instance.acquireAsync(permits);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryAcquire(final long timeout, final TimeUnit unit) {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryAcquireAsync(timeout, unit);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Boolean> tryAcquire(final long permits, final long timeout, final TimeUnit unit) {
|
||||
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||
@Override
|
||||
public RFuture<Boolean> get() {
|
||||
return instance.tryAcquireAsync(permits, timeout, unit);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue