From 27791dfda553179b8d9643df9e7b6567483bb876 Mon Sep 17 00:00:00 2001 From: seakider Date: Thu, 9 Jan 2025 19:58:07 +0800 Subject: [PATCH 1/3] Feature - RObservable for Lock object listener #3854 Signed-off-by: seakider --- .../java/org/redisson/RedissonMultiLock.java | 20 +++++++ .../src/main/java/org/redisson/api/RLock.java | 2 +- .../java/org/redisson/api/RLockAsync.java | 2 +- .../java/org/redisson/api/RLockReactive.java | 2 +- .../main/java/org/redisson/api/RLockRx.java | 2 +- .../java/org/redisson/api/RObservable.java | 43 ++++++++++++++ .../org/redisson/api/RObservableAsync.java | 42 ++++++++++++++ .../org/redisson/api/RObservableReactive.java | 45 +++++++++++++++ .../java/org/redisson/api/RObservableRx.java | 46 +++++++++++++++ .../redisson/RedissonLockReactiveTest.java | 56 +++++++++++++++++++ .../java/org/redisson/RedissonLockRxTest.java | 56 +++++++++++++++++++ .../java/org/redisson/RedissonLockTest.java | 53 +++++++++++++++++- 12 files changed, 364 insertions(+), 5 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RObservable.java create mode 100644 redisson/src/main/java/org/redisson/api/RObservableAsync.java create mode 100644 redisson/src/main/java/org/redisson/api/RObservableReactive.java create mode 100644 redisson/src/main/java/org/redisson/api/RObservableRx.java diff --git a/redisson/src/main/java/org/redisson/RedissonMultiLock.java b/redisson/src/main/java/org/redisson/RedissonMultiLock.java index 774de9136..f065c1686 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiLock.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.ObjectListener; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RLockAsync; @@ -514,4 +515,23 @@ public class RedissonMultiLock implements RLock { throw new UnsupportedOperationException(); } + @Override + public int addListener(ObjectListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public void removeListener(int listenerId) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture addListenerAsync(ObjectListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture removeListenerAsync(int listenerId) { + throw new UnsupportedOperationException(); + } } diff --git a/redisson/src/main/java/org/redisson/api/RLock.java b/redisson/src/main/java/org/redisson/api/RLock.java index 7137d42b3..f2ff6730b 100644 --- a/redisson/src/main/java/org/redisson/api/RLock.java +++ b/redisson/src/main/java/org/redisson/api/RLock.java @@ -25,7 +25,7 @@ import java.util.concurrent.locks.Lock; * @author Nikita Koksharov * */ -public interface RLock extends Lock, RLockAsync { +public interface RLock extends Lock, RLockAsync, RObservable { /** * Returns name of object diff --git a/redisson/src/main/java/org/redisson/api/RLockAsync.java b/redisson/src/main/java/org/redisson/api/RLockAsync.java index 464f23ed1..f7f7d53fc 100644 --- a/redisson/src/main/java/org/redisson/api/RLockAsync.java +++ b/redisson/src/main/java/org/redisson/api/RLockAsync.java @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit; * @author Nikita Koksharov * */ -public interface RLockAsync { +public interface RLockAsync extends RObservableAsync { /** * Unlocks the lock independently of its state diff --git a/redisson/src/main/java/org/redisson/api/RLockReactive.java b/redisson/src/main/java/org/redisson/api/RLockReactive.java index 1d34fbc29..780c41752 100644 --- a/redisson/src/main/java/org/redisson/api/RLockReactive.java +++ b/redisson/src/main/java/org/redisson/api/RLockReactive.java @@ -25,7 +25,7 @@ import reactor.core.publisher.Mono; * @author Nikita Koksharov * */ -public interface RLockReactive { +public interface RLockReactive extends RObservableReactive { /** * Returns name of object diff --git a/redisson/src/main/java/org/redisson/api/RLockRx.java b/redisson/src/main/java/org/redisson/api/RLockRx.java index c34a9708e..98687a4cb 100644 --- a/redisson/src/main/java/org/redisson/api/RLockRx.java +++ b/redisson/src/main/java/org/redisson/api/RLockRx.java @@ -26,7 +26,7 @@ import io.reactivex.rxjava3.core.Single; * @author Nikita Koksharov * */ -public interface RLockRx { +public interface RLockRx extends RObservableRx { /** * Returns name of object diff --git a/redisson/src/main/java/org/redisson/api/RObservable.java b/redisson/src/main/java/org/redisson/api/RObservable.java new file mode 100644 index 000000000..e8c9bcc11 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RObservable.java @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +/** + * interface for Lock object listener + * + * @author seakider + * + */ +public interface RObservable { + + /** + * Adds object event listener + * + * @see org.redisson.api.ExpiredObjectListener + * @see org.redisson.api.DeletedObjectListener + * + * @param listener - object event listener + * @return listener id + */ + int addListener(ObjectListener listener); + + /** + * Removes object event listener + * + * @param listenerId - listener id + */ + void removeListener(int listenerId); +} diff --git a/redisson/src/main/java/org/redisson/api/RObservableAsync.java b/redisson/src/main/java/org/redisson/api/RObservableAsync.java new file mode 100644 index 000000000..25096c898 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RObservableAsync.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +/** + * Async interface for Lock object listener + * + * @author seakider + * + */ +public interface RObservableAsync { + /** + * Adds object event listener + * + * @see org.redisson.api.ExpiredObjectListener + * @see org.redisson.api.DeletedObjectListener + * + * @param listener - object event listener + * @return listener id + */ + RFuture addListenerAsync(ObjectListener listener); + + /** + * Removes object event listener + * + * @param listenerId - listener id + */ + RFuture removeListenerAsync(int listenerId); +} diff --git a/redisson/src/main/java/org/redisson/api/RObservableReactive.java b/redisson/src/main/java/org/redisson/api/RObservableReactive.java new file mode 100644 index 000000000..e016b0dfe --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RObservableReactive.java @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import reactor.core.publisher.Mono; + +/** + * Base Reactive interface for Lock object listener + * + * @author seakider + * + */ +public interface RObservableReactive { + /** + * Adds object event listener + * + * @see org.redisson.api.ExpiredObjectListener + * @see org.redisson.api.DeletedObjectListener + * + * @param listener - object event listener + * @return listener id + */ + Mono addListener(ObjectListener listener); + + /** + * Removes object event listener + * + * @param listenerId - listener id + * @return void + */ + Mono removeListener(int listenerId); +} diff --git a/redisson/src/main/java/org/redisson/api/RObservableRx.java b/redisson/src/main/java/org/redisson/api/RObservableRx.java new file mode 100644 index 000000000..9479258af --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RObservableRx.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Single; + +/** + * Base RxJava2 interface for Lock object listener + * + * @author seakider + * + */ +public interface RObservableRx { + + /** + * Adds object event listener + * + * @see org.redisson.api.ExpiredObjectListener + * @see org.redisson.api.DeletedObjectListener + * + * @param listener - object event listener + * @return listener id + */ + Single addListener(ObjectListener listener); + + /** + * Removes object event listener + * + * @param listenerId - listener id + */ + Completable removeListener(int listenerId); +} diff --git a/redisson/src/test/java/org/redisson/RedissonLockReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonLockReactiveTest.java index 36633ba0e..e003dc815 100644 --- a/redisson/src/test/java/org/redisson/RedissonLockReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLockReactiveTest.java @@ -1,9 +1,14 @@ package org.redisson; import org.junit.jupiter.api.Test; +import org.redisson.api.DeletedObjectListener; +import org.redisson.api.ExpiredObjectListener; import org.redisson.api.RLockReactive; import reactor.core.publisher.Mono; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import static org.assertj.core.api.Assertions.assertThat; public class RedissonLockReactiveTest extends BaseReactiveTest { @@ -42,4 +47,55 @@ public class RedissonLockReactiveTest extends BaseReactiveTest { assertThat(sync(lock1.isHeldByThread(threadId1))).isFalse(); assertThat(sync(lock2.isHeldByThread(threadId2))).isFalse(); } + + @Test + public void testLockReactiveListener() { + testWithParams(redisson -> { + RLockReactive lock1 = redisson.reactive().getLock("lock1"); + + CountDownLatch latch = new CountDownLatch(4); + Mono listener1 = lock1.addListener(new ExpiredObjectListener() { + @Override + public void onExpired(String name) { + latch.countDown(); + } + }); + Mono listener2 = lock1.addListener(new DeletedObjectListener() { + @Override + public void onDeleted(String name) { + latch.countDown(); + } + }); + int listenerId1 = sync(listener1); + int listenerId2 = sync(listener2); + + sync(lock1.lock(5, TimeUnit.SECONDS)); + sync(lock1.unlock()); + assertThat(latch.getCount()).isEqualTo(3); + + try { + sync(lock1.lock(5, TimeUnit.SECONDS)); + Thread.sleep(6000); + assertThat(latch.getCount()).isEqualTo(2); + + sync(lock1.removeListener(listenerId1)); + sync(lock1.lock(5, TimeUnit.SECONDS)); + Thread.sleep(5100); + assertThat(latch.getCount()).isEqualTo(2); + + sync(lock1.lock(5, TimeUnit.SECONDS)); + sync(lock1.unlock()); + assertThat(sync(lock1.isLocked())).isFalse(); + assertThat(latch.getCount()).isEqualTo(1); + + sync(lock1.removeListener(listenerId2)); + sync(lock1.lock(5, TimeUnit.SECONDS)); + sync(lock1.unlock()); + assertThat(latch.getCount()).isEqualTo(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + }, NOTIFY_KEYSPACE_EVENTS, "Egx"); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonLockRxTest.java b/redisson/src/test/java/org/redisson/RedissonLockRxTest.java index a86802f59..5cb8af835 100644 --- a/redisson/src/test/java/org/redisson/RedissonLockRxTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLockRxTest.java @@ -2,9 +2,14 @@ package org.redisson; import io.reactivex.rxjava3.core.Single; import org.junit.jupiter.api.Test; +import org.redisson.api.DeletedObjectListener; +import org.redisson.api.ExpiredObjectListener; import org.redisson.api.RLockRx; import org.redisson.rx.BaseRxTest; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import static org.assertj.core.api.Assertions.assertThat; public class RedissonLockRxTest extends BaseRxTest { @@ -43,4 +48,55 @@ public class RedissonLockRxTest extends BaseRxTest { assertThat(sync(lock1.isHeldByThread(threadId1))).isFalse(); assertThat(sync(lock2.isHeldByThread(threadId2))).isFalse(); } + + @Test + public void testLockRxListener() { + testWithParams(redisson -> { + RLockRx lock1 = redisson.rxJava().getLock("lock1"); + + CountDownLatch latch = new CountDownLatch(4); + Single listener1 = lock1.addListener(new ExpiredObjectListener() { + @Override + public void onExpired(String name) { + latch.countDown(); + } + }); + Single listener2 = lock1.addListener(new DeletedObjectListener() { + @Override + public void onDeleted(String name) { + latch.countDown(); + } + }); + int listenerId1 = sync(listener1); + int listenerId2 = sync(listener2); + + sync(lock1.lock(5, TimeUnit.SECONDS)); + sync(lock1.unlock()); + assertThat(latch.getCount()).isEqualTo(3); + + try { + sync(lock1.lock(5, TimeUnit.SECONDS)); + Thread.sleep(6000); + assertThat(latch.getCount()).isEqualTo(2); + + sync(lock1.removeListener(listenerId1)); + sync(lock1.lock(5, TimeUnit.SECONDS)); + Thread.sleep(5100); + assertThat(latch.getCount()).isEqualTo(2); + + sync(lock1.lock(5, TimeUnit.SECONDS)); + sync(lock1.unlock()); + assertThat(sync(lock1.isLocked())).isFalse(); + assertThat(latch.getCount()).isEqualTo(1); + + sync(lock1.removeListener(listenerId2)); + sync(lock1.lock(5, TimeUnit.SECONDS)); + sync(lock1.unlock()); + assertThat(latch.getCount()).isEqualTo(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + }, NOTIFY_KEYSPACE_EVENTS, "Egx"); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonLockTest.java b/redisson/src/test/java/org/redisson/RedissonLockTest.java index 7784dc37d..533a3729d 100644 --- a/redisson/src/test/java/org/redisson/RedissonLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLockTest.java @@ -2,12 +2,13 @@ package org.redisson; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.redisson.api.DeletedObjectListener; +import org.redisson.api.ExpiredObjectListener; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.client.*; import org.redisson.client.protocol.RedisCommands; import org.redisson.config.Config; -import org.redisson.connection.balancer.RandomLoadBalancer; import org.testcontainers.containers.GenericContainer; import java.util.List; @@ -534,4 +535,54 @@ public class RedissonLockTest extends BaseConcurrentTest { Assertions.assertEquals(iterations, lockedCounter.get()); } + @Test + public void testLockListener() { + testWithParams(redisson -> { + RLock lock1 = redisson.getLock("lock1"); + CountDownLatch latch = new CountDownLatch(4); + int listenerId1 = lock1.addListener(new ExpiredObjectListener() { + @Override + public void onExpired(String name) { + latch.countDown(); + } + }); + int listenerId2 = lock1.addListener(new DeletedObjectListener() { + @Override + public void onDeleted(String name) { + latch.countDown(); + } + }); + + lock1.lock(5, TimeUnit.SECONDS); + lock1.unlock(); + assertThat(lock1.isLocked()).isFalse(); + assertThat(latch.getCount()).isEqualTo(3); + + try { + lock1.lock(5, TimeUnit.SECONDS); + Thread.sleep(6000); + assertThat(latch.getCount()).isEqualTo(2); + + lock1.removeListener(listenerId1); + lock1.lock(5, TimeUnit.SECONDS); + Thread.sleep(5100); + assertThat(latch.getCount()).isEqualTo(2); + + lock1.lock(5, TimeUnit.SECONDS); + lock1.unlock(); + assertThat(lock1.isLocked()).isFalse(); + assertThat(latch.getCount()).isEqualTo(1); + + lock1.removeListener(listenerId2); + lock1.lock(5, TimeUnit.SECONDS); + lock1.unlock(); + assertThat(lock1.isLocked()).isFalse(); + assertThat(latch.getCount()).isEqualTo(1); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + }, NOTIFY_KEYSPACE_EVENTS, "Egx"); + } } From f72cc9ed92050427f2e2f2c942b8efeb0ef61245 Mon Sep 17 00:00:00 2001 From: flyawayish Date: Sun, 12 Jan 2025 00:01:17 +0800 Subject: [PATCH 2/3] Fixed - worker stop working #6387 Signed-off-by: flyawayish --- .../org/redisson/RedissonRemoteService.java | 32 +++++++++---------- .../executor/RedissonExecutorServiceTest.java | 17 ++++++++++ 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 21fea8e0a..f8f5c7557 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Method; -import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Optional; @@ -45,10 +44,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS public static class Entry { RFuture future; - final AtomicInteger freeWorkers; + final AtomicInteger counter; public Entry(int workers) { - freeWorkers = new AtomicInteger(workers); + counter = new AtomicInteger(workers); } public void setFuture(RFuture future) { @@ -59,8 +58,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return future; } - public AtomicInteger getFreeWorkers() { - return freeWorkers; + public AtomicInteger getCounter() { + return counter; } } @@ -138,7 +137,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (entry == null) { return 0; } - return entry.getFreeWorkers().get(); + return entry.getCounter().get(); } @Override @@ -236,7 +235,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - log.debug("subscribe: {}, free workers: {}", remoteInterface, entry.getFreeWorkers()); + log.debug("subscribe: {}, entry counter: {}", remoteInterface, entry.getCounter()); RFuture take = requestQueue.pollAsync(60, TimeUnit.SECONDS); entry.setFuture(take); @@ -259,12 +258,15 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - if (entry.getFreeWorkers().get() == 0) { + // do not subscribe now, see + // https://github.com/mrniko/redisson/issues/493 + // subscribe(remoteInterface, requestQueue); + + if (entry.getCounter().get() == 0) { return; } - - int freeWorkers = entry.getFreeWorkers().decrementAndGet(); - if (freeWorkers > 0 && requestId != null) { + + if (entry.getCounter().decrementAndGet() > 0) { subscribe(remoteInterface, requestQueue, executor, bean); } @@ -414,7 +416,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (request.getOptions().getExecutionTimeoutInMillis() != null) { timeout = request.getOptions().getExecutionTimeoutInMillis(); } - long tt = timeout; RBlockingQueueAsync queue = getBlockingQueue(responseName, codec); try { @@ -426,9 +427,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } else { response = result; } - - CompletionStage clientsFuture = queue.putAsync(response) - .thenCompose(s -> queue.expireAsync(Duration.ofMillis(tt))); + RFuture clientsFuture = queue.putAsync(response); + queue.expireAsync(timeout, TimeUnit.MILLISECONDS); clientsFuture.whenComplete((res, exc) -> { if (exc != null) { @@ -507,7 +507,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS log.debug("resubscribe: {}, queue: {}", remoteInterface, requestQueue.getName()); - if (entry != null && entry.getFreeWorkers().getAndIncrement() == 0) { + if (entry != null && entry.getCounter().getAndIncrement() == 0) { // re-subscribe anyways after the method invocation subscribe(remoteInterface, requestQueue, executor, bean); } diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 4382a63e4..f2f0adb18 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -694,4 +694,21 @@ public class RedissonExecutorServiceTest extends RedisDockerTest { assertThat(counter.get()).isGreaterThan(0); } + @Test + public void testSubmitAfterPause() throws InterruptedException { + + RExecutorService redissonES = redisson.getExecutorService("test-worker"); + redissonES.registerWorkers(WorkerOptions.defaults().workers(2)); + + redissonES.submit(new RunnableTask()); + Thread.sleep(Duration.ofSeconds(1)); + assertThat(redissonES.getTaskCount()).isEqualTo(0); + + Thread.sleep(Duration.ofMinutes(1)); + + redissonES.submit(new RunnableTask()); + Thread.sleep(Duration.ofSeconds(1)); + assertThat(redissonES.getTaskCount()).isEqualTo(0); + } + } From f538b3f73286e28b252691b7adeb1028e6e30008 Mon Sep 17 00:00:00 2001 From: flyawayish Date: Sun, 12 Jan 2025 14:01:19 +0800 Subject: [PATCH 3/3] Fixed - worker stop working #6387 Signed-off-by: flyawayish --- .../org/redisson/RedissonRemoteService.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index f8f5c7557..7aff15bef 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Optional; @@ -44,10 +45,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS public static class Entry { RFuture future; - final AtomicInteger counter; + final AtomicInteger freeWorkers; public Entry(int workers) { - counter = new AtomicInteger(workers); + freeWorkers = new AtomicInteger(workers); } public void setFuture(RFuture future) { @@ -58,8 +59,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return future; } - public AtomicInteger getCounter() { - return counter; + public AtomicInteger getFreeWorkers() { + return freeWorkers; } } @@ -137,7 +138,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (entry == null) { return 0; } - return entry.getCounter().get(); + return entry.getFreeWorkers().get(); } @Override @@ -235,7 +236,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - log.debug("subscribe: {}, entry counter: {}", remoteInterface, entry.getCounter()); + log.debug("subscribe: {}, free workers: {}", remoteInterface, entry.getFreeWorkers()); RFuture take = requestQueue.pollAsync(60, TimeUnit.SECONDS); entry.setFuture(take); @@ -258,15 +259,12 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - // do not subscribe now, see - // https://github.com/mrniko/redisson/issues/493 - // subscribe(remoteInterface, requestQueue); - - if (entry.getCounter().get() == 0) { + if (entry.getFreeWorkers().get() == 0) { return; } - - if (entry.getCounter().decrementAndGet() > 0) { + + int freeWorkers = entry.getFreeWorkers().decrementAndGet(); + if (freeWorkers > 0) { subscribe(remoteInterface, requestQueue, executor, bean); } @@ -416,6 +414,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (request.getOptions().getExecutionTimeoutInMillis() != null) { timeout = request.getOptions().getExecutionTimeoutInMillis(); } + long tt = timeout; RBlockingQueueAsync queue = getBlockingQueue(responseName, codec); try { @@ -427,8 +426,9 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } else { response = result; } - RFuture clientsFuture = queue.putAsync(response); - queue.expireAsync(timeout, TimeUnit.MILLISECONDS); + + CompletionStage clientsFuture = queue.putAsync(response) + .thenCompose(s -> queue.expireAsync(Duration.ofMillis(tt))); clientsFuture.whenComplete((res, exc) -> { if (exc != null) { @@ -507,7 +507,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS log.debug("resubscribe: {}, queue: {}", remoteInterface, requestQueue.getName()); - if (entry != null && entry.getCounter().getAndIncrement() == 0) { + if (entry != null && entry.getFreeWorkers().getAndIncrement() == 0) { // re-subscribe anyways after the method invocation subscribe(remoteInterface, requestQueue, executor, bean); }