From 0349e5c3b0eaf665041aa4139960e1897db67bb8 Mon Sep 17 00:00:00 2001 From: Oleg Ternovoi Date: Fri, 2 Oct 2015 15:27:55 +0600 Subject: [PATCH] fix issue EOFException when I use RLock with SerializationCodec #254 --- .../org/redisson/RedissonCountDownLatch.java | 6 +- src/main/java/org/redisson/RedissonLock.java | 4 +- .../client/protocol/RedisCommands.java | 2 + .../client/protocol/RedisStrictCommand.java | 4 + src/test/java/org/redisson/BaseTest.java | 7 +- .../org/redisson/RedissonTwoLockedThread.java | 159 ++++++++++++++++++ 6 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 src/test/java/org/redisson/RedissonTwoLockedThread.java diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 6223555b7..fc8fbe5eb 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -175,7 +175,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return; } - commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN, + commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN_R1, "local v = redis.call('decr', KEYS[1]);" + "if v <= 0 then redis.call('del', KEYS[1]) end;" + "if v == 0 then redis.call('publish', ARGV[2], ARGV[1]) end;" + @@ -206,14 +206,14 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown @Override public boolean trySetCount(long count) { - return commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN, + return commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN_R1, "if redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[1], ARGV[2]); redis.call('publish', ARGV[3], ARGV[1]); return true else return false end", Collections.singletonList(getName()), newCountMessage, count, getChannelName()); } @Override public Future deleteAsync() { - return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN, + return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1, "if redis.call('del', KEYS[1]) == 1 then redis.call('publish', ARGV[2], ARGV[1]); return true else return false end", Collections.singletonList(getName()), newCountMessage, getChannelName()); } diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index e18e20fb4..513aa0fbd 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -312,7 +312,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void unlock() { - Boolean opStatus = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN, + Boolean opStatus = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN_R2, "local v = redis.call('get', KEYS[1]); " + "if (v == false) then " + " redis.call('publish', ARGV[4], ARGV[2]); " + @@ -355,7 +355,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { private Future forceUnlockAsync() { stopRefreshTask(); - return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN, + return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN_R1, "redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return true", Collections.singletonList(getName()), unlockMessage, getChannelName()); } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 0e73782e7..eefeb69e5 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -120,6 +120,8 @@ public interface RedisCommands { RedisStrictCommand> SCRIPT_EXISTS = new RedisStrictCommand>("SCRIPT", "EXISTS", new ObjectListReplayDecoder(), new BooleanReplayConvertor()); RedisStrictCommand EVAL_BOOLEAN = new RedisStrictCommand("EVAL", new BooleanReplayConvertor()); + RedisStrictCommand EVAL_BOOLEAN_R1 = new RedisStrictCommand("EVAL", new BooleanReplayConvertor(), 4); + RedisStrictCommand EVAL_BOOLEAN_R2 = new RedisStrictCommand("EVAL", new BooleanReplayConvertor(), 5); RedisStrictCommand EVAL_STRING = new RedisStrictCommand("EVAL", new StringReplayDecoder()); RedisStrictCommand EVAL_INTEGER = new RedisStrictCommand("EVAL"); RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); diff --git a/src/main/java/org/redisson/client/protocol/RedisStrictCommand.java b/src/main/java/org/redisson/client/protocol/RedisStrictCommand.java index 631645079..b929e4e66 100644 --- a/src/main/java/org/redisson/client/protocol/RedisStrictCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisStrictCommand.java @@ -40,6 +40,10 @@ public class RedisStrictCommand extends RedisCommand { super(name, convertor, -1); } + public RedisStrictCommand(String name, Convertor convertor, int encodeParamIndex) { + super(name, convertor, encodeParamIndex); + } + public RedisStrictCommand(String name, String subName, Convertor convertor) { super(name, subName, convertor); } diff --git a/src/test/java/org/redisson/BaseTest.java b/src/test/java/org/redisson/BaseTest.java index 0a4cee849..0e30253fc 100644 --- a/src/test/java/org/redisson/BaseTest.java +++ b/src/test/java/org/redisson/BaseTest.java @@ -18,13 +18,18 @@ public abstract class BaseTest { redisson.shutdown(); } - public static Redisson createInstance() { + public static Config createConfig() { String redisAddress = System.getProperty("redisAddress"); if (redisAddress == null) { redisAddress = "127.0.0.1:6379"; } Config config = new Config(); config.useSingleServer().setAddress(redisAddress); + return config; + } + + public static Redisson createInstance() { + Config config = createConfig(); return Redisson.create(config); } diff --git a/src/test/java/org/redisson/RedissonTwoLockedThread.java b/src/test/java/org/redisson/RedissonTwoLockedThread.java new file mode 100644 index 000000000..8a4ae2c5c --- /dev/null +++ b/src/test/java/org/redisson/RedissonTwoLockedThread.java @@ -0,0 +1,159 @@ +package org.redisson; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.redisson.client.codec.Codec; +import org.redisson.codec.JsonJacksonCodec; +import org.redisson.codec.SerializationCodec; +import org.redisson.core.RCountDownLatch; +import org.redisson.core.RLock; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; + +@RunWith(Parameterized.class) +public class RedissonTwoLockedThread { + + @Parameterized.Parameters(name= "{index} - {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] {{new JsonJacksonCodec()}, {new SerializationCodec()}}); + } + + @Parameterized.Parameter(0) + public Codec codec; + + private Redisson redisson; + + @Before + public void before() { + Config config = BaseTest.createConfig(); + config.setCodec(codec); + redisson = Redisson.create(config); + } + + @After + public void after() { + redisson.shutdown(); + } + + @Test(timeout = 3000) + public void testLock() throws InterruptedException { + final String lockName = "lock1"; + + final CountDownLatch startSignal = new CountDownLatch(1); + final CountDownLatch testSignal = new CountDownLatch(1); + final CountDownLatch completeSignal = new CountDownLatch(2); + + System.out.println("configure"); + + final long millis = System.currentTimeMillis(); + + new Thread() { + @Override + public void run() { + try { + startSignal.await(); + RLock lock = redisson.getLock(lockName); + System.out.println("1. getlock " + lock.getName() + " - " + Thread.currentThread().getId()); + lock.lock(); + System.out.println("1. lock " + lock.getName() + " - " + Thread.currentThread().getId()); + testSignal.countDown(); + Thread.sleep(500); + lock.unlock(); + System.out.println("1. unlock " + lock.getName() + " - " + Thread.currentThread().getId()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + completeSignal.countDown(); + } + }.start(); + + new Thread() { + @Override + public void run() { + try { + testSignal.await(); + RLock lock = redisson.getLock(lockName); + System.out.println("2. getlock " + lock.getName() + " - " + Thread.currentThread().getId()); + lock.lock(); + System.out.println("2. lock " + lock.getName() + " - " + Thread.currentThread().getId()); + long current = System.currentTimeMillis(); + Assert.assertTrue("current=" + current + ", millis=" + millis, current - millis >= 500); + Thread.sleep(500); + lock.unlock(); + System.out.println("2. unlock " + lock.getName() + " - " + Thread.currentThread().getId()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + completeSignal.countDown(); + } + }.start(); + + System.out.println("start"); + startSignal.countDown(); + completeSignal.await(); + System.out.println("complete"); + } + + @Test(timeout = 3000) + public void testCountDown() throws InterruptedException { + final String countDownName = getClass().getName() + ":countDown#1"; + + final CountDownLatch startSignal = new CountDownLatch(1); + final CountDownLatch testSignal = new CountDownLatch(1); + final CountDownLatch completeSignal = new CountDownLatch(2); + + System.out.println("configure"); + + final long millis = System.currentTimeMillis(); + + new Thread() { + @Override + public void run() { + try { + startSignal.await(); + RCountDownLatch countDownLatch = redisson.getCountDownLatch(countDownName); + System.out.println("1. getCountDownLatch " + countDownLatch.getName() + " - " + Thread.currentThread().getId()); + countDownLatch.trySetCount(1); + System.out.println("1. trySetCount " + countDownLatch.getName() + " - " + Thread.currentThread().getId()); + Thread.sleep(500); + testSignal.countDown(); + Thread.sleep(500); + System.out.println("1. sleep " + countDownLatch.getName() + " - " + Thread.currentThread().getId()); + countDownLatch.countDown(); + System.out.println("1. countDown " + countDownLatch.getName() + " - " + Thread.currentThread().getId()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + completeSignal.countDown(); + } + }.start(); + + new Thread() { + @Override + public void run() { + try { + testSignal.await(); + RCountDownLatch countDownLatch = redisson.getCountDownLatch(countDownName); + System.out.println("2. getCountDownLatch " + countDownLatch.getName() + " - " + Thread.currentThread().getId()); + countDownLatch.await(); + System.out.println("2. await " + countDownLatch.getName() + " - " + Thread.currentThread().getId()); + long current = System.currentTimeMillis(); + Assert.assertTrue("current=" + current + ", millis=" + millis, (current - millis) >= 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + completeSignal.countDown(); + } + }.start(); + + System.out.println("start"); + startSignal.countDown(); + completeSignal.await(); + System.out.println("complete"); + } +}