RPermitExpirableSemaphoreReactive object added. #1391

pull/1423/head
Nikita 7 years ago
parent 92496e96ab
commit 2cea6711ff

@ -24,7 +24,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.SemaphorePubSub;
@ -45,13 +45,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
private final SemaphorePubSub semaphorePubSub;
final CommandExecutor commandExecutor;
final CommandAsyncExecutor commandExecutor;
private final String timeoutName;
private final long nonExpirableTimeout = 922337203685477L;
protected RedissonPermitExpirableSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) {
public RedissonPermitExpirableSemaphore(CommandAsyncExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) {
super(commandExecutor, name);
this.timeoutName = suffixName(name, "timeout");
this.commandExecutor = commandExecutor;

@ -41,6 +41,7 @@ import org.redisson.api.RLockReactive;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RPatternTopicReactive;
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RScoredSortedSetReactive;
@ -76,6 +77,7 @@ import org.redisson.reactive.RedissonLockReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
import org.redisson.reactive.RedissonMapReactive;
import org.redisson.reactive.RedissonPatternTopicReactive;
import org.redisson.reactive.RedissonPermitExpirableSemaphoreReactive;
import org.redisson.reactive.RedissonQueueReactive;
import org.redisson.reactive.RedissonReadWriteLockReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive;
@ -119,6 +121,11 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub);
}
@Override
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name) {
return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub);
}
@Override
public RReadWriteLockReactive getReadWriteLock(String name) {
return new RedissonReadWriteLockReactive(commandExecutor, name);

@ -0,0 +1,224 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
/**
* Semaphore object with support of lease time parameter for each acquired permit.
*
* <p>Each permit identified by own id and could be released only using its id.
* Permit id is a 128-bits unique random identifier generated each time during acquiring.
*
* <p>Works in non-fair mode. Therefore order of acquiring is unpredictable.
*
* @author Nikita Koksharov
*
*/
public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns its id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release(String)} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @return permit id
*/
Publisher<String> acquire();
/**
* Acquires a permit with defined lease time from this semaphore,
* blocking until one is available,
* or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns its id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @param leaseTime - permit lease time
* @param unit - time unit
* @return permit id
*/
Publisher<String> acquire(long leaseTime, TimeUnit unit);
/**
* Acquires a permit only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code null}.
*
* @return permit id if a permit was acquired and {@code null}
* otherwise
*/
Publisher<String> tryAcquire();
/**
* Acquires a permit from this semaphore, if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release(String)} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If a permit is acquired then the permit id is returned.
*
* <p>If the specified waiting time elapses then the value {@code null}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param waitTime the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
* @return permit id if a permit was acquired and {@code null}
* if the waiting time elapsed before a permit was acquired
*/
Publisher<String> tryAcquire(long waitTime, TimeUnit unit);
/**
* Acquires a permit with defined lease time from this semaphore,
* if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the permit id,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release(String)} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If a permit is acquired then the permit id is returned.
*
* <p>If the specified waiting time elapses then the value {@code null}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param waitTime the maximum time to wait for a permit
* @param leaseTime permit lease time
* @param unit the time unit of the {@code timeout} argument
* @return permit id if a permit was acquired and {@code null}
* if the waiting time elapsed before a permit was acquired
*/
Publisher<String> tryAcquire(long waitTime, long leaseTime, TimeUnit unit);
/**
* Releases a permit by its id, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that was just released.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire()}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permitId - permit id
* @return {@code true} if a permit has been released and {@code false}
* otherwise
*/
Publisher<Boolean> tryRelease(String permitId);
/**
* Releases a permit by its id, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that was just released.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire()}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* <p>Throws an exception if permit id doesn't exist or has already been release
*
* @param permitId - permit id
* @return void
*/
Publisher<Void> release(String permitId);
/**
* Returns the current number of available permits.
*
* @return number of available permits
*/
Publisher<Integer> availablePermits();
/**
* Sets number of permits.
*
* @param permits - number of permits
* @return <code>true</code> if permits has been set successfully, otherwise <code>false</code>.
*/
Publisher<Boolean> trySetPermits(int permits);
/**
* Increases or decreases the number of available permits by defined value.
*
* @param permits - number of permits to add/remove
* @return void
*/
Publisher<Void> addPermits(int permits);
}

@ -38,6 +38,15 @@ public interface RedissonReactiveClient {
*/
RSemaphoreReactive getSemaphore(String name);
/**
* Returns semaphore instance by name.
* Supports lease time parameter for each acquired permit.
*
* @param name - name of object
* @return PermitExpirableSemaphore object
*/
RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name);
/**
* Returns readWriteLock instance by name.
*

@ -0,0 +1,151 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonLock;
import org.redisson.RedissonPermitExpirableSemaphore;
import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync;
import org.redisson.api.RPermitExpirableSemaphoreAsync;
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.pubsub.SemaphorePubSub;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableReactive implements RPermitExpirableSemaphoreReactive {
private final RPermitExpirableSemaphoreAsync instance;
public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
super(connectionManager, name);
instance = new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub);
}
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return new RedissonLock(commandExecutor, name);
}
@Override
public Publisher<String> acquire() {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.acquireAsync();
}
});
}
@Override
public Publisher<String> acquire(final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.acquireAsync(leaseTime, unit);
}
});
}
@Override
public Publisher<String> tryAcquire() {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.tryAcquireAsync();
}
});
}
@Override
public Publisher<String> tryAcquire(final long waitTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.tryAcquireAsync(waitTime, unit);
}
});
}
@Override
public Publisher<String> tryAcquire(final long waitTime, final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.tryAcquireAsync(waitTime, leaseTime, unit);
}
});
}
@Override
public Publisher<Boolean> tryRelease(final String permitId) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryReleaseAsync(permitId);
}
});
}
@Override
public Publisher<Void> release(final String permitId) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.releaseAsync(permitId);
}
});
}
@Override
public Publisher<Integer> availablePermits() {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.availablePermitsAsync();
}
});
}
@Override
public Publisher<Boolean> trySetPermits(final int permits) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetPermitsAsync(permits);
}
});
}
@Override
public Publisher<Void> addPermits(final int permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.addPermitsAsync(permits);
}
});
}
}
Loading…
Cancel
Save