RReadWriteLockReactive implemented #963

pull/970/head
Nikita 8 years ago
parent 690962a954
commit b27acc8e3d

@ -40,6 +40,7 @@ import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RPatternTopicReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSetCacheReactive;
@ -69,6 +70,7 @@ import org.redisson.reactive.RedissonMapCacheReactive;
import org.redisson.reactive.RedissonMapReactive;
import org.redisson.reactive.RedissonPatternTopicReactive;
import org.redisson.reactive.RedissonQueueReactive;
import org.redisson.reactive.RedissonReadWriteLockReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonScriptReactive;
import org.redisson.reactive.RedissonSetCacheReactive;
@ -101,6 +103,11 @@ public class RedissonReactive implements RedissonReactiveClient {
codecProvider = config.getCodecProvider();
}
@Override
public RReadWriteLockReactive getReadWriteLock(String name) {
return new RedissonReadWriteLockReactive(commandExecutor, name, id);
}
@Override
public RLockReactive getLock(String name) {
return new RedissonLockReactive(commandExecutor, name, id);

@ -26,7 +26,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub;
import io.netty.util.concurrent.Future;
@ -40,16 +40,13 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonReadLock extends RedissonLock implements RLock {
private final CommandExecutor commandExecutor;
protected RedissonReadLock(CommandExecutor commandExecutor, String name, UUID id) {
public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
this.commandExecutor = commandExecutor;
}
@Override
String getChannelName() {
return "redisson_rwlock__{" + getName() + "}";
return prefixName("redisson_rwlock", getName());
}
String getWriteLockName(long threadId) {
@ -159,7 +156,8 @@ public class RedissonReadLock extends RedissonLock implements RLock {
@Override
public boolean isLocked() {
String res = commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
RFuture<String> future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
String res = get(future);
return "read".equals(res);
}

@ -20,7 +20,7 @@ import java.util.concurrent.locks.Lock;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
/**
* A {@code ReadWriteLock} maintains a pair of associated {@link
@ -38,11 +38,9 @@ import org.redisson.command.CommandExecutor;
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {
private final UUID id;
private final CommandExecutor commandExecutor;
RedissonReadWriteLock(CommandExecutor commandExecutor, String name, UUID id) {
public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
}

@ -26,7 +26,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub;
import io.netty.util.concurrent.Future;
@ -40,16 +40,13 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonWriteLock extends RedissonLock implements RLock {
private final CommandExecutor commandExecutor;
protected RedissonWriteLock(CommandExecutor commandExecutor, String name, UUID id) {
protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
this.commandExecutor = commandExecutor;
}
@Override
String getChannelName() {
return "redisson_rwlock__{" + getName() + "}";
return prefixName("redisson_rwlock", getName());
}
@Override
@ -144,7 +141,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
@Override
public boolean isLocked() {
String res = commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
RFuture<String> future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
String res = get(future);
return "write".equals(res);
}

@ -0,0 +1,49 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.locks.Lock;
/**
* A {@code ReadWriteLock} maintains a pair of associated {@link
* Lock locks}, one for read-only operations and one for writing.
* The {@link #readLock read lock} may be held simultaneously by
* multiple reader threads, so long as there are no writers. The
* {@link #writeLock write lock} is exclusive.
*
* Works in non-fair mode. Therefore order of read and write
* locking is unspecified.
*
* @author Nikita Koksharov
*
*/
public interface RReadWriteLockReactive extends RExpirableReactive {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
RLockReactive readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
RLockReactive writeLock();
}

@ -30,6 +30,14 @@ import org.redisson.config.Config;
*/
public interface RedissonReactiveClient {
/**
* Returns readWriteLock instance by name.
*
* @param name - name of object
* @return Lock object
*/
RReadWriteLockReactive getReadWriteLock(String name);
/**
* Returns lock instance by name.
* <p>

@ -23,6 +23,7 @@ import org.redisson.RedissonLock;
import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
@ -38,9 +39,13 @@ public class RedissonLockReactive extends RedissonExpirableReactive implements R
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, UUID id) {
super(connectionManager, name);
instance = new RedissonLock(connectionManager, name, id);
instance = createLock(connectionManager, name, id);
}
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return new RedissonLock(commandExecutor, name, id);
}
@Override
public Publisher<Boolean> forceUnlock() {
return reactive(new Supplier<RFuture<Boolean>>() {

@ -0,0 +1,65 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.UUID;
import org.redisson.RedissonReadWriteLock;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonReadWriteLockReactive extends RedissonExpirableReactive implements RReadWriteLockReactive {
private final RReadWriteLock instance;
private final UUID id;
public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.id = id;
this.instance = new RedissonReadWriteLock(commandExecutor, name, id);
}
@Override
public RLockReactive readLock() {
return new RedissonLockReactive(commandExecutor, getName(), id) {
@Override
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return instance.readLock();
}
};
}
@Override
public RLockReactive writeLock() {
return new RedissonLockReactive(commandExecutor, getName(), id) {
@Override
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return instance.writeLock();
}
};
}
}
Loading…
Cancel
Save