diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index a010bdb96..833b9e934 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -144,10 +144,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS register(remoteInterface, object, workers, commandExecutor.getServiceManager().getExecutor()); } - private RBlockingQueue getBlockingQueue(String name, Codec codec) { - return new RedissonBlockingQueue<>(codec, commandExecutor, name); - } - @Override public void register(Class remoteInterface, T object, int workers, ExecutorService executor) { if (workers < 1) { diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index c8b0f634b..7ee8d19c1 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -287,8 +287,18 @@ public class RedisConnection implements RedisCommands { fastReconnect.complete(null); fastReconnect = null; } - - private void close() { + + public void close() { + try { + closeAsync().sync(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw e; + } + } + + private void closeInternal() { CommandData command = getCurrentCommand(); if ((command != null && command.isBlockingCommand()) || !connectionPromise.isDone()) { @@ -304,7 +314,7 @@ public class RedisConnection implements RedisCommands { public CompletableFuture forceFastReconnectAsync() { CompletableFuture promise = new CompletableFuture(); fastReconnect = promise; - close(); + closeInternal(); return promise; } @@ -320,7 +330,7 @@ public class RedisConnection implements RedisCommands { public ChannelFuture closeIdleAsync() { status = Status.CLOSED_IDLE; - close(); + closeInternal(); return channel.closeFuture(); } @@ -330,7 +340,7 @@ public class RedisConnection implements RedisCommands { public ChannelFuture closeAsync() { status = Status.CLOSED; - close(); + closeInternal(); return channel.closeFuture(); } diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 468f1c3b5..b45681b39 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -16,6 +16,7 @@ package org.redisson.executor; import org.redisson.RedissonExecutorService; +import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.client.codec.Codec; @@ -247,7 +248,9 @@ public class TasksService extends BaseRemoteService { return CompletableFuture.completedFuture(resp); }); } - return CompletableFuture.completedFuture(response); + + RBlockingQueueAsync queue = getBlockingQueue(responseQueueName, codec); + return queue.removeAsync(response).thenApply(r -> response); }).whenComplete((r, ex) -> { if (ex != null) { scheduleCancelResponseCheck(mapName, requestId); diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java index 39404536b..ab28d224a 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java @@ -20,7 +20,9 @@ import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import org.redisson.RedissonBlockingQueue; import org.redisson.RedissonMap; +import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.api.RemoteInvocationOptions; @@ -200,4 +202,8 @@ public abstract class BaseRemoteService { return result; } + protected RBlockingQueue getBlockingQueue(String name, Codec codec) { + return new RedissonBlockingQueue<>(codec, commandExecutor, name); + } + } diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index b240058fb..7cf841706 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -30,7 +30,10 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; -public class RedissonScheduledExecutorServiceTest extends BaseTest { +public class + + +RedissonScheduledExecutorServiceTest extends BaseTest { private static RedissonNode node; @@ -528,7 +531,8 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3); cancel(future1); - assertThat(redisson.getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE); + Thread.sleep(50); + assertThat(redisson.getBucket("executed1").get()).isGreaterThan(1000L); Thread.sleep(3000); assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3); @@ -537,9 +541,10 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { RScheduledFuture future2 = executor.scheduleWithFixedDelay(new ScheduledLongRepeatableTask("counter", "executed2"), 1, 2, TimeUnit.SECONDS); Thread.sleep(6000); assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3); - - executor.cancelTask(future2.getTaskId()); - assertThat(redisson.getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE); + + assertThat(executor.cancelTask(future2.getTaskId())).isTrue(); + assertThat(redisson.getBucket("executed2").get()).isGreaterThan(1000L); + Thread.sleep(3000); assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);