From 6786a22b943879306ba6d8816af6c67b76cc4185 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 11 May 2018 15:05:57 +0300 Subject: [PATCH 1/5] Fixed - RemoteService sync invocations aren't thread safe. #1433 --- .../java/org/redisson/BaseRemoteService.java | 79 +++++++++++-------- .../remote/RemoteServiceResponse.java | 1 + .../redisson/RedissonRemoteServiceTest.java | 37 +++++++++ 3 files changed, 84 insertions(+), 33 deletions(-) diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 902a4b713..692e670e8 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -65,7 +65,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThreadLocalRandom; /** * @@ -568,7 +567,7 @@ public abstract class BaseRemoteService { } }); } - + private T sync(final Class remoteInterface, final RemoteInvocationOptions options) { // local copy of the options, to prevent mutation final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); @@ -595,20 +594,53 @@ public abstract class BaseRemoteService { RemotePromise addPromise = new RemotePromise(requestId); RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy, System.currentTimeMillis()); - addAsync(requestQueueName, request, addPromise).sync(); - RBlockingQueue responseQueue = null; - if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { - responseQueue = redisson.getBlockingQueue(responseQueueName, codec); + final RFuture ackFuture; + if (optionsCopy.isAckExpected()) { + ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false); + } else { + ackFuture = null; } - + + final RPromise responseFuture; + if (optionsCopy.isResultExpected()) { + responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false); + } else { + responseFuture = null; + } + + RFuture futureAdd = addAsync(requestQueueName, request, addPromise); + futureAdd.await(); + if (!futureAdd.isSuccess()) { + if (responseFuture != null) { + responseFuture.cancel(false); + } + if (ackFuture != null) { + ackFuture.cancel(false); + } + throw futureAdd.cause(); + } + + if (!futureAdd.get()) { + if (responseFuture != null) { + responseFuture.cancel(false); + } + if (ackFuture != null) { + ackFuture.cancel(false); + } + throw new RedisException("Task hasn't been added"); + } + // poll for the ack only if expected - if (optionsCopy.isAckExpected()) { + if (ackFuture != null) { String ackName = getAckName(requestId); - RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), - TimeUnit.MILLISECONDS); + ackFuture.await(); + RemoteServiceAck ack = ackFuture.getNow(); if (ack == null) { - ack = tryPollAckAgain(optionsCopy, responseQueue, ackName); + RFuture ackFutureAttempt = + tryPollAckAgainAsync(optionsCopy, ackName, requestId); + ackFutureAttempt.await(); + ack = ackFutureAttempt.getNow(); if (ack == null) { throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request); @@ -618,9 +650,9 @@ public abstract class BaseRemoteService { } // poll for the response only if expected - if (optionsCopy.isResultExpected()) { - RemoteServiceResponse response = (RemoteServiceResponse) responseQueue - .poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); + if (responseFuture != null) { + responseFuture.awaitUninterruptibly(); + RemoteServiceResponse response = (RemoteServiceResponse) responseFuture.getNow(); if (response == null) { throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); @@ -638,25 +670,6 @@ public abstract class BaseRemoteService { return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler); } - private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy, - RBlockingQueue responseQueue, String ackName) - throws InterruptedException { - RFuture ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if redis.call('setnx', KEYS[1], 1) == 1 then " - + "redis.call('pexpire', KEYS[1], ARGV[1]);" - + "return 0;" - + "end;" - + "redis.call('del', KEYS[1]);" - + "return 1;", - Arrays. asList(ackName), optionsCopy.getAckTimeoutInMillis()); - - ackClientsFuture.sync(); - if (ackClientsFuture.getNow()) { - return (RemoteServiceAck) responseQueue.poll(); - } - return null; - } - private RFuture tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy, String ackName, final RequestId requestId) { final RPromise promise = new RedissonPromise(); diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java index 8a91b308f..c935ed303 100644 --- a/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java @@ -43,6 +43,7 @@ public class RemoteServiceResponse implements RRemoteServiceResponse, Serializab this.id = id; } + @Override public String getId() { return id; } diff --git a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 488383f3d..bfa1e4d42 100644 --- a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -5,10 +5,14 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.io.NotSerializableException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -16,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; import org.redisson.api.RFuture; +import org.redisson.api.RRemoteService; import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.annotation.RRemoteAsync; @@ -213,6 +218,38 @@ public class RedissonRemoteServiceTest extends BaseTest { } } + + @Test + public void testConcurrentInvocations() { + ExecutorService executorService = Executors.newFixedThreadPool(2); + RRemoteService remoteService = redisson.getRemoteService(); + remoteService.register(RemoteInterface.class, new RemoteImpl()); + RemoteInterface service = redisson.getRemoteService().get(RemoteInterface.class); + + List> futures = new ArrayList<>(); + + int iterations = 1000; + AtomicBoolean bool = new AtomicBoolean(); + for (int i = 0; i < iterations; i++) { + futures.add(executorService.submit(() -> { + try { + if (ThreadLocalRandom.current().nextBoolean()) { + service.resultMethod(1L); + } else { + service.methodOverload(); + } + } catch (Exception e) { + bool.set(true); + } + })); + } + + while (!futures.stream().allMatch(Future::isDone)) {} + + assertThat(bool.get()).isFalse(); + remoteService.deregister(RemoteInterface.class); + } + @Test public void testCancelAsync() throws InterruptedException { From 02d5b0ff4fa780e09074055a40f822f9f340a760 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 11 May 2018 16:36:48 +0300 Subject: [PATCH 2/5] Fixed - bad argument #1 to 'len' (string expected, got boolean) #1152 --- .../java/org/redisson/RedissonMapCache.java | 120 ++++++++++-------- 1 file changed, 70 insertions(+), 50 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index dca0b53ea..5b7c88c4e 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -363,9 +363,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " redis.call('zrem', KEYS[2], lruItem); " + " redis.call('zrem', KEYS[3], lruItem); " + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + - " local removedChannelName = KEYS[6]; " + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + - " redis.call('publish', removedChannelName, msg); " + + " if lruItemValue ~= false then " + + " local removedChannelName = KEYS[6]; " + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + + " redis.call('publish', removedChannelName, msg); " + + "end; " + " end; " + " end; " + " end; " + @@ -527,9 +529,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " redis.call('zrem', KEYS[2], lruItem);" + " redis.call('zrem', KEYS[3], lruItem);" + " redis.call('zrem', lastAccessTimeSetName, lruItem);" + - " local removedChannelName = KEYS[7];" + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" + - " redis.call('publish', removedChannelName, msg);" + + " if lruItemValue ~= false then " + + " local removedChannelName = KEYS[7];" + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" + + " redis.call('publish', removedChannelName, msg);" + + "end; " + " end;" + " end" + " end;" + @@ -596,9 +600,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " redis.call('zrem', KEYS[2], lruItem); " + " redis.call('zrem', KEYS[3], lruItem); " + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + - " local removedChannelName = KEYS[6]; " + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + - " redis.call('publish', removedChannelName, msg); " + + " if lruItemValue ~= false then " + + " local removedChannelName = KEYS[6]; " + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + + " redis.call('publish', removedChannelName, msg); " + + "end; " + " end; " + " end; " + " end; " + @@ -672,9 +678,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " redis.call('zrem', KEYS[2], lruItem); " + " redis.call('zrem', KEYS[3], lruItem); " + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + - " local removedChannelName = KEYS[7]; " + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + - " redis.call('publish', removedChannelName, msg); " + + " if lruItemValue ~= false then " + + " local removedChannelName = KEYS[7]; " + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + + " redis.call('publish', removedChannelName, msg); " + + "end; " + " end; " + " end; " + " end; " + @@ -804,9 +812,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " redis.call('zrem', KEYS[2], lruItem); " + " redis.call('zrem', KEYS[3], lruItem); " + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + - " local removedChannelName = KEYS[7]; " + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + - " redis.call('publish', removedChannelName, msg); " + + " if lruItemValue ~= false then " + + " local removedChannelName = KEYS[7]; " + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + + " redis.call('publish', removedChannelName, msg); " + + "end; " + " end; " + " end; " + " end; " + @@ -936,13 +946,15 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + - " redis.call('hdel', KEYS[1], lruItem); " + - " redis.call('zrem', KEYS[2], lruItem); " + - " redis.call('zrem', KEYS[3], lruItem); " + - " redis.call('zrem', lastAccessTimeSetName, lruItem); " + - " local removedChannelName = KEYS[7]; " + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + - " redis.call('publish', removedChannelName, msg); " + + " if lruItemValue ~= false then " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + + " local removedChannelName = KEYS[7]; " + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + + " redis.call('publish', removedChannelName, msg); " + + "end; " + " end; " + " end; " + " end; " + @@ -1335,13 +1347,15 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + - " redis.call('hdel', KEYS[1], lruItem); " + - " redis.call('zrem', KEYS[2], lruItem); " + - " redis.call('zrem', KEYS[3], lruItem); " + - " redis.call('zrem', lastAccessTimeSetName, lruItem); " + - " local removedChannelName = KEYS[7]; " + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + - " redis.call('publish', removedChannelName, msg); " + + " if lruItemValue ~= false then " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + + " local removedChannelName = KEYS[7]; " + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + + " redis.call('publish', removedChannelName, msg); " + + "end; " + " end; " + " end; " + " end; " + @@ -1385,13 +1399,15 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + - " redis.call('hdel', KEYS[1], lruItem); " + - " redis.call('zrem', KEYS[2], lruItem); " + - " redis.call('zrem', KEYS[3], lruItem); " + - " redis.call('zrem', lastAccessTimeSetName, lruItem); " + - " local removedChannelName = KEYS[6]; " + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + - " redis.call('publish', removedChannelName, msg); " + + " if lruItemValue ~= false then " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + + " local removedChannelName = KEYS[6]; " + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + + " redis.call('publish', removedChannelName, msg); " + + "end; " + " end; " + " end; " + " end; " + @@ -1529,13 +1545,15 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + - " redis.call('hdel', KEYS[1], lruItem); " + - " redis.call('zrem', KEYS[2], lruItem); " + - " redis.call('zrem', KEYS[3], lruItem); " + - " redis.call('zrem', lastAccessTimeSetName, lruItem); " + - " local removedChannelName = KEYS[6]; " + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + - " redis.call('publish', removedChannelName, msg); " + + " if lruItemValue ~= false then " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + + " local removedChannelName = KEYS[6]; " + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + + " redis.call('publish', removedChannelName, msg); " + + "end; " + " end; " + " end; " + " end; " + @@ -1720,13 +1738,15 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do" + " if lruItem then" + " local lruItemValue = redis.call('hget', KEYS[1], lruItem);" + - " redis.call('hdel', KEYS[1], lruItem);" + - " redis.call('zrem', KEYS[2], lruItem);" + - " redis.call('zrem', KEYS[3], lruItem);" + - " redis.call('zrem', lastAccessTimeSetName, lruItem);" + - " local removedChannelName = KEYS[7];" + - " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" + - " redis.call('publish', removedChannelName, msg);" + + " if lruItemValue ~= false then " + + " redis.call('hdel', KEYS[1], lruItem);" + + " redis.call('zrem', KEYS[2], lruItem);" + + " redis.call('zrem', KEYS[3], lruItem);" + + " redis.call('zrem', lastAccessTimeSetName, lruItem);" + + " local removedChannelName = KEYS[7];" + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" + + " redis.call('publish', removedChannelName, msg);" + + "end; " + " end;" + " end" + " end;" + From fe79bbf5f2816e4dd4f6151c063aedf205003196 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 11 May 2018 16:47:08 +0300 Subject: [PATCH 3/5] Serializable interface added AttributeMessage --- .../src/main/java/org/redisson/tomcat/AttributeMessage.java | 4 +++- .../src/main/java/org/redisson/tomcat/AttributeMessage.java | 4 +++- .../src/main/java/org/redisson/tomcat/AttributeMessage.java | 4 +++- .../src/main/java/org/redisson/tomcat/AttributeMessage.java | 4 +++- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/AttributeMessage.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/AttributeMessage.java index 695e2bdc6..1a3f20a41 100644 --- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/AttributeMessage.java +++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/AttributeMessage.java @@ -15,12 +15,14 @@ */ package org.redisson.tomcat; +import java.io.Serializable; + /** * * @author Nikita Koksharov * */ -public class AttributeMessage { +public class AttributeMessage implements Serializable { private String sessionId; diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/AttributeMessage.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/AttributeMessage.java index 695e2bdc6..1a3f20a41 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/AttributeMessage.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/AttributeMessage.java @@ -15,12 +15,14 @@ */ package org.redisson.tomcat; +import java.io.Serializable; + /** * * @author Nikita Koksharov * */ -public class AttributeMessage { +public class AttributeMessage implements Serializable { private String sessionId; diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/AttributeMessage.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/AttributeMessage.java index 695e2bdc6..1a3f20a41 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/AttributeMessage.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/AttributeMessage.java @@ -15,12 +15,14 @@ */ package org.redisson.tomcat; +import java.io.Serializable; + /** * * @author Nikita Koksharov * */ -public class AttributeMessage { +public class AttributeMessage implements Serializable { private String sessionId; diff --git a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/AttributeMessage.java b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/AttributeMessage.java index 695e2bdc6..1a3f20a41 100644 --- a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/AttributeMessage.java +++ b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/AttributeMessage.java @@ -15,12 +15,14 @@ */ package org.redisson.tomcat; +import java.io.Serializable; + /** * * @author Nikita Koksharov * */ -public class AttributeMessage { +public class AttributeMessage implements Serializable { private String sessionId; From 29dff07e97c313d0a5dbe93cbc1ea328434617b2 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 12 May 2018 11:18:38 +0300 Subject: [PATCH 4/5] Fixed - RedisTimeoutException arise during blocking command execution on RBlockingQueue and RBlockingDeque objects. #1434 --- .../java/org/redisson/command/CommandAsyncService.java | 8 ++++---- .../java/org/redisson/RedissonBlockingDequeTest.java | 9 +++++++++ .../java/org/redisson/RedissonBlockingQueueTest.java | 2 +- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 9fc9eb69d..a89956c08 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -726,10 +726,10 @@ public class CommandAsyncService implements CommandAsyncExecutor { public void run(Timeout timeout) throws Exception { // re-connection hasn't been made // and connection is still active - if (orignalChannel == connection.getChannel() - && connection.isActive()) { - return; - } +// if (orignalChannel == connection.getChannel() +// && connection.isActive()) { +// return; +// } if (details.getAttemptPromise().trySuccess(null)) { connection.forceFastReconnectAsync(); diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java index 0b12a1311..aa57a684f 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java @@ -11,6 +11,15 @@ import org.redisson.api.RBlockingDeque; public class RedissonBlockingDequeTest extends BaseTest { + @Test + public void testPollLastAndOfferFirstTo() throws InterruptedException { + RBlockingDeque blockingDeque = redisson.getBlockingDeque("blocking_deque"); + long start = System.currentTimeMillis(); + String redisTask = blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS); + assertThat(System.currentTimeMillis() - start).isBetween(950L, 1050L); + assertThat(redisTask).isNull(); + } + @Test(timeout = 3000) public void testShortPoll() throws InterruptedException { RBlockingDeque queue = redisson.getBlockingDeque("queue:pollany"); diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index aeba7cefc..2743521b0 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -38,7 +38,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { return redisson.getBlockingQueue("queue"); } -// @Test + @Test public void testPollWithBrokenConnection() throws IOException, InterruptedException, ExecutionException { RedisProcess runner = new RedisRunner() .nosave() From dc6649301bce4006b2b63e06131a45d8db52e837 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 12 May 2018 11:18:49 +0300 Subject: [PATCH 5/5] Refactoring --- .../java/org/redisson/RedissonMapCache.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 5b7c88c4e..6e2f4a19a 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -946,11 +946,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + " if lruItemValue ~= false then " + - " redis.call('hdel', KEYS[1], lruItem); " + - " redis.call('zrem', KEYS[2], lruItem); " + - " redis.call('zrem', KEYS[3], lruItem); " + - " redis.call('zrem', lastAccessTimeSetName, lruItem); " + " local removedChannelName = KEYS[7]; " + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + " redis.call('publish', removedChannelName, msg); " + @@ -1347,11 +1347,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + " if lruItemValue ~= false then " + - " redis.call('hdel', KEYS[1], lruItem); " + - " redis.call('zrem', KEYS[2], lruItem); " + - " redis.call('zrem', KEYS[3], lruItem); " + - " redis.call('zrem', lastAccessTimeSetName, lruItem); " + " local removedChannelName = KEYS[7]; " + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + " redis.call('publish', removedChannelName, msg); " + @@ -1399,11 +1399,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + " if lruItemValue ~= false then " + - " redis.call('hdel', KEYS[1], lruItem); " + - " redis.call('zrem', KEYS[2], lruItem); " + - " redis.call('zrem', KEYS[3], lruItem); " + - " redis.call('zrem', lastAccessTimeSetName, lruItem); " + " local removedChannelName = KEYS[6]; " + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + " redis.call('publish', removedChannelName, msg); " + @@ -1545,11 +1545,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + " if lruItemValue ~= false then " + - " redis.call('hdel', KEYS[1], lruItem); " + - " redis.call('zrem', KEYS[2], lruItem); " + - " redis.call('zrem', KEYS[3], lruItem); " + - " redis.call('zrem', lastAccessTimeSetName, lruItem); " + " local removedChannelName = KEYS[6]; " + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue); " + " redis.call('publish', removedChannelName, msg); " + @@ -1738,11 +1738,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " for index, lruItem in ipairs(lruItems) do" + " if lruItem then" + " local lruItemValue = redis.call('hget', KEYS[1], lruItem);" + + " redis.call('hdel', KEYS[1], lruItem);" + + " redis.call('zrem', KEYS[2], lruItem);" + + " redis.call('zrem', KEYS[3], lruItem);" + + " redis.call('zrem', lastAccessTimeSetName, lruItem);" + " if lruItemValue ~= false then " + - " redis.call('hdel', KEYS[1], lruItem);" + - " redis.call('zrem', KEYS[2], lruItem);" + - " redis.call('zrem', KEYS[3], lruItem);" + - " redis.call('zrem', lastAccessTimeSetName, lruItem);" + " local removedChannelName = KEYS[7];" + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" + " redis.call('publish', removedChannelName, msg);"