fix issue EOFException when I use RLock with SerializationCodec #254

pull/255/head
Oleg Ternovoi 9 years ago
parent 4c483c6bc0
commit 0349e5c3b0

@ -175,7 +175,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return;
}
commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN_R1,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" +
@ -206,14 +206,14 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public boolean trySetCount(long count) {
return commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN_R1,
"if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[2]); redis.call('publish', ARGV[3], ARGV[1]); return true else return false end",
Collections.<Object>singletonList(getName()), newCountMessage, count, getChannelName());
}
@Override
public Future<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1,
"if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end",
Collections.<Object>singletonList(getName()), newCountMessage, getChannelName());
}

@ -312,7 +312,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override
public void unlock() {
Boolean opStatus = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
Boolean opStatus = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN_R2,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('publish', ARGV[4], ARGV[2]); " +
@ -355,7 +355,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
private Future<Boolean> forceUnlockAsync() {
stopRefreshTask();
return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1,
"redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return true",
Collections.<Object>singletonList(getName()), unlockMessage, getChannelName());
}

@ -120,6 +120,8 @@ public interface RedisCommands {
RedisStrictCommand<List<Boolean>> SCRIPT_EXISTS = new RedisStrictCommand<List<Boolean>>("SCRIPT", "EXISTS", new ObjectListReplayDecoder<Boolean>(), new BooleanReplayConvertor());
RedisStrictCommand<Boolean> EVAL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> EVAL_BOOLEAN_R1 = new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4);
RedisStrictCommand<Boolean> EVAL_BOOLEAN_R2 = new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5);
RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand<String>("EVAL", new StringReplayDecoder());
RedisStrictCommand<Long> EVAL_INTEGER = new RedisStrictCommand<Long>("EVAL");
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());

@ -40,6 +40,10 @@ public class RedisStrictCommand<T> extends RedisCommand<T> {
super(name, convertor, -1);
}
public RedisStrictCommand(String name, Convertor<T> convertor, int encodeParamIndex) {
super(name, convertor, encodeParamIndex);
}
public RedisStrictCommand(String name, String subName, Convertor<T> convertor) {
super(name, subName, convertor);
}

@ -18,13 +18,18 @@ public abstract class BaseTest {
redisson.shutdown();
}
public static Redisson createInstance() {
public static Config createConfig() {
String redisAddress = System.getProperty("redisAddress");
if (redisAddress == null) {
redisAddress = "127.0.0.1:6379";
}
Config config = new Config();
config.useSingleServer().setAddress(redisAddress);
return config;
}
public static Redisson createInstance() {
Config config = createConfig();
return Redisson.create(config);
}

@ -0,0 +1,159 @@
package org.redisson;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RLock;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
@RunWith(Parameterized.class)
public class RedissonTwoLockedThread {
@Parameterized.Parameters(name= "{index} - {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {{new JsonJacksonCodec()}, {new SerializationCodec()}});
}
@Parameterized.Parameter(0)
public Codec codec;
private Redisson redisson;
@Before
public void before() {
Config config = BaseTest.createConfig();
config.setCodec(codec);
redisson = Redisson.create(config);
}
@After
public void after() {
redisson.shutdown();
}
@Test(timeout = 3000)
public void testLock() throws InterruptedException {
final String lockName = "lock1";
final CountDownLatch startSignal = new CountDownLatch(1);
final CountDownLatch testSignal = new CountDownLatch(1);
final CountDownLatch completeSignal = new CountDownLatch(2);
System.out.println("configure");
final long millis = System.currentTimeMillis();
new Thread() {
@Override
public void run() {
try {
startSignal.await();
RLock lock = redisson.getLock(lockName);
System.out.println("1. getlock " + lock.getName() + " - " + Thread.currentThread().getId());
lock.lock();
System.out.println("1. lock " + lock.getName() + " - " + Thread.currentThread().getId());
testSignal.countDown();
Thread.sleep(500);
lock.unlock();
System.out.println("1. unlock " + lock.getName() + " - " + Thread.currentThread().getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
completeSignal.countDown();
}
}.start();
new Thread() {
@Override
public void run() {
try {
testSignal.await();
RLock lock = redisson.getLock(lockName);
System.out.println("2. getlock " + lock.getName() + " - " + Thread.currentThread().getId());
lock.lock();
System.out.println("2. lock " + lock.getName() + " - " + Thread.currentThread().getId());
long current = System.currentTimeMillis();
Assert.assertTrue("current=" + current + ", millis=" + millis, current - millis >= 500);
Thread.sleep(500);
lock.unlock();
System.out.println("2. unlock " + lock.getName() + " - " + Thread.currentThread().getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
completeSignal.countDown();
}
}.start();
System.out.println("start");
startSignal.countDown();
completeSignal.await();
System.out.println("complete");
}
@Test(timeout = 3000)
public void testCountDown() throws InterruptedException {
final String countDownName = getClass().getName() + ":countDown#1";
final CountDownLatch startSignal = new CountDownLatch(1);
final CountDownLatch testSignal = new CountDownLatch(1);
final CountDownLatch completeSignal = new CountDownLatch(2);
System.out.println("configure");
final long millis = System.currentTimeMillis();
new Thread() {
@Override
public void run() {
try {
startSignal.await();
RCountDownLatch countDownLatch = redisson.getCountDownLatch(countDownName);
System.out.println("1. getCountDownLatch " + countDownLatch.getName() + " - " + Thread.currentThread().getId());
countDownLatch.trySetCount(1);
System.out.println("1. trySetCount " + countDownLatch.getName() + " - " + Thread.currentThread().getId());
Thread.sleep(500);
testSignal.countDown();
Thread.sleep(500);
System.out.println("1. sleep " + countDownLatch.getName() + " - " + Thread.currentThread().getId());
countDownLatch.countDown();
System.out.println("1. countDown " + countDownLatch.getName() + " - " + Thread.currentThread().getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
completeSignal.countDown();
}
}.start();
new Thread() {
@Override
public void run() {
try {
testSignal.await();
RCountDownLatch countDownLatch = redisson.getCountDownLatch(countDownName);
System.out.println("2. getCountDownLatch " + countDownLatch.getName() + " - " + Thread.currentThread().getId());
countDownLatch.await();
System.out.println("2. await " + countDownLatch.getName() + " - " + Thread.currentThread().getId());
long current = System.currentTimeMillis();
Assert.assertTrue("current=" + current + ", millis=" + millis, (current - millis) >= 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
completeSignal.countDown();
}
}.start();
System.out.println("start");
startSignal.countDown();
completeSignal.await();
System.out.println("complete");
}
}
Loading…
Cancel
Save