From e97ae41ad43fd6929d04aa74cc05225c173a37e6 Mon Sep 17 00:00:00 2001
From: Nikita Koksharov <nkoksharov@redisson.pro>
Date: Thu, 6 Jul 2023 08:10:39 +0300
Subject: [PATCH 1/3] Fixed - RedisConnection.close() method has private
 visibility. #5154

---
 .../org/redisson/client/RedisConnection.java  | 20 ++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java
index c8b0f634b..7ee8d19c1 100644
--- a/redisson/src/main/java/org/redisson/client/RedisConnection.java
+++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java
@@ -287,8 +287,18 @@ public class RedisConnection implements RedisCommands {
         fastReconnect.complete(null);
         fastReconnect = null;
     }
-    
-    private void close() {
+
+    public void close() {
+        try {
+            closeAsync().sync();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void closeInternal() {
         CommandData<?, ?> command = getCurrentCommand();
         if ((command != null && command.isBlockingCommand())
                     || !connectionPromise.isDone()) {
@@ -304,7 +314,7 @@ public class RedisConnection implements RedisCommands {
     public CompletableFuture<Void> forceFastReconnectAsync() {
         CompletableFuture<Void> promise = new CompletableFuture<Void>();
         fastReconnect = promise;
-        close();
+        closeInternal();
         return promise;
     }
 
@@ -320,7 +330,7 @@ public class RedisConnection implements RedisCommands {
 
     public ChannelFuture closeIdleAsync() {
         status = Status.CLOSED_IDLE;
-        close();
+        closeInternal();
         return channel.closeFuture();
     }
 
@@ -330,7 +340,7 @@ public class RedisConnection implements RedisCommands {
 
     public ChannelFuture closeAsync() {
         status = Status.CLOSED;
-        close();
+        closeInternal();
         return channel.closeFuture();
     }
 

From 448db935a4a2c82e8a76a48d74f7659f115978c6 Mon Sep 17 00:00:00 2001
From: Nikita Koksharov <nkoksharov@redisson.pro>
Date: Thu, 6 Jul 2023 10:36:18 +0300
Subject: [PATCH 2/3] Fixed - RExecutorService's task response should be
 deleted if task was canceled #5157

---
 .../java/org/redisson/RedissonRemoteService.java  |  4 ----
 .../java/org/redisson/executor/TasksService.java  |  5 ++++-
 .../RedissonScheduledExecutorServiceTest.java     | 15 ++++++++++-----
 3 files changed, 14 insertions(+), 10 deletions(-)

diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java
index a010bdb96..833b9e934 100644
--- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java
+++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java
@@ -144,10 +144,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
         register(remoteInterface, object, workers, commandExecutor.getServiceManager().getExecutor());
     }
 
-    private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
-        return new RedissonBlockingQueue<>(codec, commandExecutor, name);
-    }
-    
     @Override
     public <T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor) {
         if (workers < 1) {
diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java
index 468f1c3b5..b45681b39 100644
--- a/redisson/src/main/java/org/redisson/executor/TasksService.java
+++ b/redisson/src/main/java/org/redisson/executor/TasksService.java
@@ -16,6 +16,7 @@
 package org.redisson.executor;
 
 import org.redisson.RedissonExecutorService;
+import org.redisson.api.RBlockingQueueAsync;
 import org.redisson.api.RFuture;
 import org.redisson.api.RMap;
 import org.redisson.client.codec.Codec;
@@ -247,7 +248,9 @@ public class TasksService extends BaseRemoteService {
                         return CompletableFuture.completedFuture(resp);
                     });
                 }
-                return CompletableFuture.completedFuture(response);
+
+                RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseQueueName, codec);
+                return queue.removeAsync(response).thenApply(r -> response);
             }).whenComplete((r, ex) -> {
                 if (ex != null) {
                     scheduleCancelResponseCheck(mapName, requestId);
diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java
index 74be801b8..d07066ce1 100644
--- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java
+++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java
@@ -30,7 +30,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-public class RedissonScheduledExecutorServiceTest extends BaseTest {
+public class
+
+
+RedissonScheduledExecutorServiceTest extends BaseTest {
 
     private static RedissonNode node;
     
@@ -493,7 +496,8 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
         assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
         
         cancel(future1);
-        assertThat(redisson.<Long>getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE);
+        Thread.sleep(50);
+        assertThat(redisson.<Long>getBucket("executed1").get()).isGreaterThan(1000L);
 
         Thread.sleep(3000);
         assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
@@ -502,9 +506,10 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
         RScheduledFuture<?> future2 = executor.scheduleWithFixedDelay(new ScheduledLongRepeatableTask("counter", "executed2"), 1, 2, TimeUnit.SECONDS);
         Thread.sleep(6000);
         assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
-        
-        executor.cancelTask(future2.getTaskId());
-        assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
+
+        assertThat(executor.cancelTask(future2.getTaskId())).isTrue();
+        assertThat(redisson.<Long>getBucket("executed2").get()).isGreaterThan(1000L);
+
 
         Thread.sleep(3000);
         assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);

From 655cb9f8ea9534463fd986141c27dcaa89b9fe86 Mon Sep 17 00:00:00 2001
From: Nikita Koksharov <nkoksharov@redisson.pro>
Date: Thu, 6 Jul 2023 10:36:27 +0300
Subject: [PATCH 3/3] Fixed - RExecutorService's task response should be
 deleted if task was canceled #5157

---
 .../main/java/org/redisson/remote/BaseRemoteService.java    | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java
index 39404536b..ab28d224a 100644
--- a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java
+++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java
@@ -20,7 +20,9 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.CharsetUtil;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import org.redisson.RedissonBlockingQueue;
 import org.redisson.RedissonMap;
+import org.redisson.api.RBlockingQueue;
 import org.redisson.api.RFuture;
 import org.redisson.api.RMap;
 import org.redisson.api.RemoteInvocationOptions;
@@ -200,4 +202,8 @@ public abstract class BaseRemoteService {
         return result;
     }
 
+    protected <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
+        return new RedissonBlockingQueue<>(codec, commandExecutor, name);
+    }
+
 }