RSemaphoreReactive added #977
parent
3c0449eda7
commit
1b89afbd6f
@ -0,0 +1,179 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2016 Nikita Koksharov
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.redisson.api;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author Nikita Koksharov
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface RSemaphoreReactive extends RExpirableReactive {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a permit only if one is available at the
|
||||||
|
* time of invocation.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If no permit is available then this method will return
|
||||||
|
* immediately with the value {@code false}.
|
||||||
|
*
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* otherwise
|
||||||
|
*/
|
||||||
|
Publisher<Boolean> tryAcquire();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires the given number of permits only if all are available at the
|
||||||
|
* time of invocation.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permits, if all are available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by given number of permits.
|
||||||
|
*
|
||||||
|
* <p>If no permits are available then this method will return
|
||||||
|
* immediately with the value {@code false}.
|
||||||
|
*
|
||||||
|
* @param permits the number of permits to acquire
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* otherwise
|
||||||
|
*/
|
||||||
|
Publisher<Boolean> tryAcquire(int permits);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires a permit from this semaphore.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
Publisher<Void> acquire();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires the given number of permits, if they are available,
|
||||||
|
* and returns immediately, reducing the number of available permits
|
||||||
|
* by the given amount.
|
||||||
|
*
|
||||||
|
* @param permits the number of permits to acquire
|
||||||
|
* @throws IllegalArgumentException if {@code permits} is negative
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
Publisher<Void> acquire(int permits);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Releases a permit, returning it to the semaphore.
|
||||||
|
*
|
||||||
|
* <p>Releases a permit, increasing the number of available permits by
|
||||||
|
* one. If any threads of Redisson client are trying to acquire a permit,
|
||||||
|
* then one is selected and given the permit that was just released.
|
||||||
|
*
|
||||||
|
* <p>There is no requirement that a thread that releases a permit must
|
||||||
|
* have acquired that permit by calling {@link #acquire()}.
|
||||||
|
* Correct usage of a semaphore is established by programming convention
|
||||||
|
* in the application.
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
Publisher<Void> release();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Releases the given number of permits, returning them to the semaphore.
|
||||||
|
*
|
||||||
|
* <p>Releases the given number of permits, increasing the number of available permits by
|
||||||
|
* the given number of permits. If any threads of Redisson client are trying to
|
||||||
|
* acquire a permits, then next threads is selected and tries to acquire the permits that was just released.
|
||||||
|
*
|
||||||
|
* <p>There is no requirement that a thread that releases a permits must
|
||||||
|
* have acquired that permit by calling {@link #acquire()}.
|
||||||
|
* Correct usage of a semaphore is established by programming convention
|
||||||
|
* in the application.
|
||||||
|
*
|
||||||
|
* @param permits amount
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
Publisher<Void> release(int permits);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets number of permits.
|
||||||
|
*
|
||||||
|
* @param permits - number of permits
|
||||||
|
* @return <code>true</code> if permits has been set successfully, otherwise <code>false</code>.
|
||||||
|
*/
|
||||||
|
Publisher<Boolean> trySetPermits(int permits);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Acquires a permit, if one is available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If a permit is acquired then the value {@code true} is returned.
|
||||||
|
*
|
||||||
|
* <p>If the specified waiting time elapses then the value {@code false}
|
||||||
|
* is returned. If the time is less than or equal to zero, the method
|
||||||
|
* will not wait at all.
|
||||||
|
*
|
||||||
|
* @param waitTime the maximum time to wait for a permit
|
||||||
|
* @param unit the time unit of the {@code timeout} argument
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* if the waiting time elapsed before a permit was acquired
|
||||||
|
*/
|
||||||
|
Publisher<Boolean> tryAcquire(long waitTime, TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquires the given number of permits only if all are available
|
||||||
|
* within the given waiting time.
|
||||||
|
*
|
||||||
|
* <p>Acquires a permits, if all are available and returns immediately,
|
||||||
|
* with the value {@code true},
|
||||||
|
* reducing the number of available permits by one.
|
||||||
|
*
|
||||||
|
* <p>If a permits is acquired then the value {@code true} is returned.
|
||||||
|
*
|
||||||
|
* <p>If the specified waiting time elapses then the value {@code false}
|
||||||
|
* is returned. If the time is less than or equal to zero, the method
|
||||||
|
* will not wait at all.
|
||||||
|
*
|
||||||
|
* @param permits amount
|
||||||
|
* @param waitTime the maximum time to wait for a available permits
|
||||||
|
* @param unit the time unit of the {@code timeout} argument
|
||||||
|
* @return {@code true} if a permit was acquired and {@code false}
|
||||||
|
* if the waiting time elapsed before a permit was acquired
|
||||||
|
*/
|
||||||
|
Publisher<Boolean> tryAcquire(int permits, long waitTime, TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shrinks the number of available permits by the indicated
|
||||||
|
* reduction. This method can be useful in subclasses that use
|
||||||
|
* semaphores to track resources that become unavailable. This
|
||||||
|
* method differs from {@link #acquire()} in that it does not block
|
||||||
|
* waiting for permits to become available.
|
||||||
|
*
|
||||||
|
* @param permits - reduction the number of permits to remove
|
||||||
|
* @return void
|
||||||
|
* @throws IllegalArgumentException if {@code reduction} is negative
|
||||||
|
*/
|
||||||
|
Publisher<Void> reducePermits(int permits);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,152 @@
|
|||||||
|
/**
|
||||||
|
* Copyright 2016 Nikita Koksharov
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.redisson.reactive;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
|
import org.redisson.RedissonLock;
|
||||||
|
import org.redisson.RedissonSemaphore;
|
||||||
|
import org.redisson.api.RFuture;
|
||||||
|
import org.redisson.api.RLockAsync;
|
||||||
|
import org.redisson.api.RSemaphoreAsync;
|
||||||
|
import org.redisson.api.RSemaphoreReactive;
|
||||||
|
import org.redisson.command.CommandAsyncExecutor;
|
||||||
|
import org.redisson.command.CommandReactiveExecutor;
|
||||||
|
import org.redisson.pubsub.SemaphorePubSub;
|
||||||
|
|
||||||
|
import reactor.fn.Supplier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author Nikita Koksharov
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class RedissonSemaphoreReactive extends RedissonExpirableReactive implements RSemaphoreReactive {
|
||||||
|
|
||||||
|
private final RSemaphoreAsync instance;
|
||||||
|
|
||||||
|
public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
|
||||||
|
super(connectionManager, name);
|
||||||
|
instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
|
||||||
|
return new RedissonLock(commandExecutor, name, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Boolean> tryAcquire() {
|
||||||
|
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> get() {
|
||||||
|
return instance.tryAcquireAsync();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Boolean> tryAcquire(final int permits) {
|
||||||
|
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> get() {
|
||||||
|
return instance.tryAcquireAsync(permits);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Void> acquire() {
|
||||||
|
return reactive(new Supplier<RFuture<Void>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Void> get() {
|
||||||
|
return instance.acquireAsync();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Void> acquire(final int permits) {
|
||||||
|
return reactive(new Supplier<RFuture<Void>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Void> get() {
|
||||||
|
return instance.acquireAsync(permits);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Void> release() {
|
||||||
|
return reactive(new Supplier<RFuture<Void>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Void> get() {
|
||||||
|
return instance.releaseAsync();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Void> release(final int permits) {
|
||||||
|
return reactive(new Supplier<RFuture<Void>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Void> get() {
|
||||||
|
return instance.releaseAsync(permits);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Boolean> trySetPermits(final int permits) {
|
||||||
|
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> get() {
|
||||||
|
return instance.trySetPermitsAsync(permits);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Boolean> tryAcquire(final long waitTime, final TimeUnit unit) {
|
||||||
|
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> get() {
|
||||||
|
return instance.tryAcquireAsync(waitTime, unit);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Boolean> tryAcquire(final int permits, final long waitTime, final TimeUnit unit) {
|
||||||
|
return reactive(new Supplier<RFuture<Boolean>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Boolean> get() {
|
||||||
|
return instance.tryAcquireAsync(permits, waitTime, unit);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Publisher<Void> reducePermits(final int permits) {
|
||||||
|
return reactive(new Supplier<RFuture<Void>>() {
|
||||||
|
@Override
|
||||||
|
public RFuture<Void> get() {
|
||||||
|
return instance.reducePermitsAsync(permits);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue