ReadWriteLock implementation added. #206

pull/337/head
Nikita 9 years ago
parent 2436dcc52f
commit 2447a4558a

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.util.concurrent.Promise;

@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
@ -50,6 +51,7 @@ import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSet;
@ -141,6 +143,11 @@ public class Redisson implements RedissonClient {
return new RedissonReactive(config);
}
@Override
public RReadWriteLock getReadWriteLock(String name) {
return new RedissonReadWriteLock(commandExecutor, name, id);
}
@Override
public <V> RBucket<V> getBucket(String name) {
return new RedissonBucket<V>(commandExecutor, name);

@ -16,6 +16,7 @@
package org.redisson;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import org.redisson.client.codec.Codec;
import org.redisson.core.ClusterNode;
@ -37,6 +38,7 @@ import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSet;
@ -52,6 +54,8 @@ import org.redisson.core.RTopic;
*/
public interface RedissonClient {
RReadWriteLock getReadWriteLock(String name);
/**
* Returns map-based cache instance with eviction support by name
* using provided codec for both cache keys and values.

@ -0,0 +1,151 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.core.RLock;
import io.netty.util.concurrent.Future;
/**
* Lock will be removed automatically if client disconnects.
*
* @author Nikita Koksharov
*
*/
public class RedissonReadLock extends RedissonLock implements RLock {
private final CommandExecutor commandExecutor;
protected RedissonReadLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
this.commandExecutor = commandExecutor;
}
private String getLockName() {
return id + ":" + Thread.currentThread().getId();
}
String getChannelName() {
return "redisson_rwlock__{" + getName() + "}";
}
Long tryLockInner(final long leaseTime, final TimeUnit unit) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], KEYS[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') then " +
"redis.call('hincrby', KEYS[1], KEYS[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getLockName()), internalLockLeaseTime);
}
@Override
public void unlock() {
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"return true; " +
"else if (mode == 'read') then " +
"local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " +
"if (lockExists == false) then " +
"return nil;" +
"else " +
"local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return false; " +
"else " +
"redis.call('hdel', KEYS[1], KEYS[2]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"end; " +
"return true; "+
"end; " +
"end; " +
"end; " +
"return nil; ",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime);
if (opStatus == null) {
throw new IllegalStateException("Can't unlock lock Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
stopRefreshTask();
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
Future<Boolean> forceUnlockAsync() {
stopRefreshTask();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"end; " +
"return true; " +
"else " +
"return false; " +
"end;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage);
}
@Override
public boolean isLocked() {
String res = commandExecutor.read(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
return "read".equals(res);
}
@Override
public boolean isHeldByCurrentThread() {
return commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName());
}
@Override
public int getHoldCount() {
Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName());
if (res == null) {
return 0;
}
return res.intValue();
}
}

@ -0,0 +1,47 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.UUID;
import org.redisson.command.CommandExecutor;
import org.redisson.core.RLock;
import org.redisson.core.RReadWriteLock;
public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {
public static final Long unlockMessage = 0L;
private final UUID id;
private final CommandExecutor commandExecutor;
RedissonReadWriteLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
}
@Override
public RLock readLock() {
return new RedissonReadLock(commandExecutor, getName(), id);
}
@Override
public RLock writeLock() {
return new RedissonWriteLock(commandExecutor, getName(), id);
}
}

@ -0,0 +1,153 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.core.RLock;
import io.netty.util.concurrent.Future;
/**
* Lock will be removed automatically if client disconnects.
*
* @author Nikita Koksharov
*
*/
public class RedissonWriteLock extends RedissonLock implements RLock {
private final CommandExecutor commandExecutor;
protected RedissonWriteLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
this.commandExecutor = commandExecutor;
}
private String getLockName() {
return id + ":" + Thread.currentThread().getId();
}
String getChannelName() {
return "redisson_rwlock__{" + getName() + "}";
}
Long tryLockInner(final long leaseTime, final TimeUnit unit) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
"redis.call('hset', KEYS[1], KEYS[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], KEYS[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], KEYS[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName(), getLockName()), internalLockLeaseTime);
}
@Override
public void unlock() {
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"return true; " +
"else if (mode == 'write') then " +
"local lockExists = redis.call('hexists', KEYS[1], KEYS[2]); " +
"if (lockExists == false) then " +
"return nil;" +
"else " +
"local counter = redis.call('hincrby', KEYS[1], KEYS[2], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return false; " +
"else " +
"redis.call('hdel', KEYS[1], KEYS[2]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"end; " +
"return true; "+
"end; " +
"end; " +
"end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage, internalLockLeaseTime);
if (opStatus == null) {
throw new IllegalStateException("Can't unlock lock Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
stopRefreshTask();
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
Future<Boolean> forceUnlockAsync() {
stopRefreshTask();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[3], ARGV[1]); " +
"end; " +
"return true; " +
"else " +
"return false; " +
"end;",
Arrays.<Object>asList(getName(), getLockName(), getChannelName()), RedissonReadWriteLock.unlockMessage);
}
@Override
public boolean isLocked() {
String res = commandExecutor.read(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
return "write".equals(res);
}
@Override
public boolean isHeldByCurrentThread() {
return commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName());
}
@Override
public int getHoldCount() {
Long res = commandExecutor.read(getName(), LongCodec.INSTANCE, RedisCommands.HGET, getName(), getLockName());
if (res == null) {
return 0;
}
return res.intValue();
}
}

@ -0,0 +1,35 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.concurrent.locks.ReadWriteLock;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.
* Use {@link RReadWriteLock#getHoldCount()} to get a holds count.
*
* @author Nikita Koksharov
*
*/
public interface RReadWriteLock extends ReadWriteLock, RExpirable {
RLock readLock();
RLock writeLock();
}

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.pubsub;
import org.redisson.RedissonCountDownLatch;

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.pubsub;
import org.redisson.RedissonLock;

@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.pubsub;
import java.util.concurrent.ConcurrentMap;

Loading…
Cancel
Save