RedissonLock is now reentrant

pull/6/head
Nikita 11 years ago
parent 5847d36548
commit 555fadab89

@ -24,6 +24,11 @@ public class JsonCodec extends RedisCodec<Object, Object> {
public JsonCodec() {
objectMapper.setSerializationInclusion(Include.NON_NULL);
objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -27,13 +28,73 @@ import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
// TODO make it reentrant
public class RedissonLock implements RLock {
public static class LockValue {
private UUID id;
private Long threadId;
private int counter;
public LockValue() {
}
public LockValue(UUID id, Long threadId) {
super();
this.id = id;
this.threadId = threadId;
}
public void decCounter() {
counter--;
}
public void incCounter() {
counter++;
}
public int getCounter() {
return counter;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((threadId == null) ? 0 : threadId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
LockValue other = (LockValue) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (threadId == null) {
if (other.threadId != null)
return false;
} else if (!threadId.equals(other.threadId))
return false;
return true;
}
}
private final Redisson redisson;
private final RedisPubSubConnection<Object, Object> pubSubConnection;
private final RedisConnection<Object, Object> connection;
private final UUID id = UUID.randomUUID();
private final String groupName = "redisson_lock";
private final String name;
@ -91,6 +152,10 @@ public class RedissonLock implements RLock {
}
}
private String getKeyName() {
return groupName + name;
}
private String getChannelName() {
return groupName + name;
}
@ -105,7 +170,18 @@ public class RedissonLock implements RLock {
@Override
public boolean tryLock() {
Boolean res = connection.hsetnx(groupName, name, "1");
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
currentLock.incCounter();
Boolean res = connection.setnx(getKeyName(), currentLock);
if (!res) {
LockValue lock = (LockValue) connection.get(getKeyName());
if (lock.equals(currentLock)) {
lock.incCounter();
connection.set(getKeyName(), lock);
return true;
}
}
return res;
}
@ -127,8 +203,21 @@ public class RedissonLock implements RLock {
@Override
public void unlock() {
connection.hdel(groupName, name);
connection.publish(getChannelName(), unlockMessage);
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
LockValue lock = (LockValue) connection.get(getKeyName());
if (lock.equals(currentLock)) {
if (lock.getCounter() > 1) {
lock.decCounter();
connection.set(getKeyName(), lock);
} else {
connection.del(getKeyName());
connection.publish(getChannelName(), unlockMessage);
}
} else {
// TODO throw error
}
}
@Override

@ -21,6 +21,19 @@ public class RedissonLockTest extends BaseConcurrentTest {
redisson.shutdown();
}
@Test
public void testReentrancy() {
Redisson redisson = Redisson.create();
Lock lock = redisson.getLock("lock1");
lock.lock();
lock.lock();
lock.unlock();
lock.unlock();
redisson.shutdown();
}
@Test
public void testConcurrency_SingleInstance() throws InterruptedException {
final AtomicInteger lockedCounter = new AtomicInteger();

Loading…
Cancel
Save