Add RLockReactive #962

pull/970/head
Nikita 8 years ago
parent 1550bc4785
commit b81840eb08

@ -30,7 +30,7 @@ import org.redisson.api.RLock;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.pubsub.LockPubSub;
import org.slf4j.Logger;
@ -63,9 +63,9 @@ public class RedissonLock extends RedissonExpirable implements RLock {
protected static final LockPubSub PUBSUB = new LockPubSub();
final CommandExecutor commandExecutor;
final CommandAsyncExecutor commandExecutor;
protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
@ -417,12 +417,14 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public boolean isHeldByCurrentThread() {
return commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(Thread.currentThread().getId()));
return get(future);
}
@Override
public int getHoldCount() {
Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId()));
RFuture<Long> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName(Thread.currentThread().getId()));
Long res = get(future);
if (res == null) {
return 0;
}

@ -35,6 +35,7 @@ import org.redisson.api.RHyperLogLogReactive;
import org.redisson.api.RKeysReactive;
import org.redisson.api.RLexSortedSetReactive;
import org.redisson.api.RListReactive;
import org.redisson.api.RLockReactive;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RPatternTopicReactive;
@ -63,6 +64,7 @@ import org.redisson.reactive.RedissonHyperLogLogReactive;
import org.redisson.reactive.RedissonKeysReactive;
import org.redisson.reactive.RedissonLexSortedSetReactive;
import org.redisson.reactive.RedissonListReactive;
import org.redisson.reactive.RedissonLockReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
import org.redisson.reactive.RedissonMapReactive;
import org.redisson.reactive.RedissonPatternTopicReactive;
@ -99,6 +101,10 @@ public class RedissonReactive implements RedissonReactiveClient {
codecProvider = config.getCodecProvider();
}
@Override
public RLockReactive getLock(String name) {
return new RedissonLockReactive(commandExecutor, name, id);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {

@ -0,0 +1,54 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
*/
public interface RLockReactive extends RExpirableReactive {
Publisher<Boolean> forceUnlock();
Publisher<Void> unlock();
Publisher<Void> unlock(long threadId);
Publisher<Boolean> tryLock();
Publisher<Void> lock();
Publisher<Void> lock(long threadId);
Publisher<Void> lock(long leaseTime, TimeUnit unit);
Publisher<Void> lock(long leaseTime, TimeUnit unit, long threadId);
Publisher<Boolean> tryLock(long threadId);
Publisher<Boolean> tryLock(long waitTime, TimeUnit unit);
Publisher<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
Publisher<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit, long threadId);
}

@ -30,6 +30,16 @@ import org.redisson.config.Config;
*/
public interface RedissonReactiveClient {
/**
* Returns lock instance by name.
* <p>
* Implements a <b>non-fair</b> locking so doesn't guarantee an acquire order by threads.
*
* @param name - name of object
* @return Lock object
*/
RLockReactive getLock(String name);
/**
* Returns set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.

@ -38,8 +38,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R> Publisher<R> reactive(Supplier<RFuture<R>> supplier);
ConnectionManager getConnectionManager();
<T, R> Publisher<R> evalReadReactive(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, Object ... params);

@ -0,0 +1,164 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonLock;
import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonLockReactive extends RedissonExpirableReactive implements RLockReactive {
private final RLockAsync instance;
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) {
super(connectionManager, name);
instance = new RedissonLock(connectionManager, name, id);
}
@Override
public Publisher<Boolean> forceUnlock() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.forceUnlockAsync();
}
});
}
@Override
public Publisher<Void> unlock() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.unlockAsync();
}
});
}
@Override
public Publisher<Void> unlock(final long threadId) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.unlockAsync(threadId);
}
});
}
@Override
public Publisher<Boolean> tryLock() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync();
}
});
}
@Override
public Publisher<Void> lock() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.lockAsync();
}
});
}
@Override
public Publisher<Void> lock(final long threadId) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.lockAsync(threadId);
}
});
}
@Override
public Publisher<Void> lock(final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.lockAsync(leaseTime, unit);
}
});
}
@Override
public Publisher<Void> lock(final long leaseTime, final TimeUnit unit, final long threadId) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.lockAsync(leaseTime, unit, threadId);
}
});
}
@Override
public Publisher<Boolean> tryLock(final long threadId) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync(threadId);
}
});
}
@Override
public Publisher<Boolean> tryLock(final long waitTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync(waitTime, unit);
}
});
}
@Override
public Publisher<Boolean> tryLock(final long waitTime, final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync(waitTime, leaseTime, unit);
}
});
}
@Override
public Publisher<Boolean> tryLock(final long waitTime, final long leaseTime, final TimeUnit unit, final long threadId) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync(waitTime, leaseTime, unit, threadId);
}
});
}
}
Loading…
Cancel
Save