Fixed - RExecutorService's task response should be deleted if task was canceled #5157

pull/5147/head^2
Nikita Koksharov 2 years ago
parent e97ae41ad4
commit 448db935a4

@ -144,10 +144,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
register(remoteInterface, object, workers, commandExecutor.getServiceManager().getExecutor());
}
private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<>(codec, commandExecutor, name);
}
@Override
public <T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor) {
if (workers < 1) {

@ -16,6 +16,7 @@
package org.redisson.executor;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.client.codec.Codec;
@ -247,7 +248,9 @@ public class TasksService extends BaseRemoteService {
return CompletableFuture.completedFuture(resp);
});
}
return CompletableFuture.completedFuture(response);
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseQueueName, codec);
return queue.removeAsync(response).thenApply(r -> response);
}).whenComplete((r, ex) -> {
if (ex != null) {
scheduleCancelResponseCheck(mapName, requestId);

@ -30,7 +30,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonScheduledExecutorServiceTest extends BaseTest {
public class
RedissonScheduledExecutorServiceTest extends BaseTest {
private static RedissonNode node;
@ -493,7 +496,8 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
cancel(future1);
assertThat(redisson.<Long>getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE);
Thread.sleep(50);
assertThat(redisson.<Long>getBucket("executed1").get()).isGreaterThan(1000L);
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
@ -502,9 +506,10 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
RScheduledFuture<?> future2 = executor.scheduleWithFixedDelay(new ScheduledLongRepeatableTask("counter", "executed2"), 1, 2, TimeUnit.SECONDS);
Thread.sleep(6000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
executor.cancelTask(future2.getTaskId());
assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
assertThat(executor.cancelTask(future2.getTaskId())).isTrue();
assertThat(redisson.<Long>getBucket("executed2").get()).isGreaterThan(1000L);
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);

Loading…
Cancel
Save