diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index a010bdb96..833b9e934 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -144,10 +144,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS register(remoteInterface, object, workers, commandExecutor.getServiceManager().getExecutor()); } - private RBlockingQueue getBlockingQueue(String name, Codec codec) { - return new RedissonBlockingQueue<>(codec, commandExecutor, name); - } - @Override public void register(Class remoteInterface, T object, int workers, ExecutorService executor) { if (workers < 1) { diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 468f1c3b5..b45681b39 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -16,6 +16,7 @@ package org.redisson.executor; import org.redisson.RedissonExecutorService; +import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.client.codec.Codec; @@ -247,7 +248,9 @@ public class TasksService extends BaseRemoteService { return CompletableFuture.completedFuture(resp); }); } - return CompletableFuture.completedFuture(response); + + RBlockingQueueAsync queue = getBlockingQueue(responseQueueName, codec); + return queue.removeAsync(response).thenApply(r -> response); }).whenComplete((r, ex) -> { if (ex != null) { scheduleCancelResponseCheck(mapName, requestId); diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index 74be801b8..d07066ce1 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -30,7 +30,10 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; -public class RedissonScheduledExecutorServiceTest extends BaseTest { +public class + + +RedissonScheduledExecutorServiceTest extends BaseTest { private static RedissonNode node; @@ -493,7 +496,8 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3); cancel(future1); - assertThat(redisson.getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE); + Thread.sleep(50); + assertThat(redisson.getBucket("executed1").get()).isGreaterThan(1000L); Thread.sleep(3000); assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3); @@ -502,9 +506,10 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { RScheduledFuture future2 = executor.scheduleWithFixedDelay(new ScheduledLongRepeatableTask("counter", "executed2"), 1, 2, TimeUnit.SECONDS); Thread.sleep(6000); assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3); - - executor.cancelTask(future2.getTaskId()); - assertThat(redisson.getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE); + + assertThat(executor.cancelTask(future2.getTaskId())).isTrue(); + assertThat(redisson.getBucket("executed2").get()).isGreaterThan(1000L); + Thread.sleep(3000); assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);