diff --git a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java index b5d88957d..a831f931d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java +++ b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java @@ -171,12 +171,20 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF @Override public RFuture containsAsync(Collection objects) { - CompletionStage future = CompletableFuture.completedFuture(null); + CompletionStage f = CompletableFuture.completedFuture(null); if (size == 0) { - future = readConfigAsync(); + f = readConfigAsync().handle((r, e) -> { + if (e instanceof IllegalArgumentException) { + return 0L; + } + return null; + }); } - CompletionStage f = future.thenCompose(r -> { + f = f.thenCompose(r -> { + if (r != null) { + return CompletableFuture.completedFuture(r); + } List allIndexes = index(objects); List params = new ArrayList<>(); @@ -188,7 +196,9 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local size = redis.call('hget', KEYS[1], 'size');" + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + - "assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" + + "if size ~= ARGV[1] or hashIterations ~= ARGV[2] then " + + "return 0;" + + "end;" + "local k = 0;" + "local c = 0;" + diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index d107f4bbe..d50e16638 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -260,7 +260,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "local v = redis.call('zrange', KEYS[2], 0, 0); " // if new task added to queue head then publish its startTime // to all scheduler workers - + "if v[1] == expiredTaskIds[i] then " + + "if v[1] == scheduledName then " + "redis.call('publish', KEYS[3], startTime); " + "end;" diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 796c0c403..70b7e5e4b 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -132,12 +132,13 @@ public class TasksService extends BaseRemoteService { + "end; " + "if tonumber(ARGV[1]) > 0 then " + + "local scheduledName = 'ff:' .. ARGV[2];" + "redis.call('set', KEYS[7], ARGV[4]);" - + "redis.call('zadd', KEYS[3], ARGV[1], 'ff:' .. ARGV[2]);" + + "redis.call('zadd', KEYS[3], ARGV[1], scheduledName);" + "local v = redis.call('zrange', KEYS[3], 0, 0); " // if new task added to queue head then publish its startTime // to all scheduler workers - + "if v[1] == ARGV[2] then " + + "if v[1] == scheduledName then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end; " + "end;" diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java index 720fcc17a..009baab2f 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java @@ -201,7 +201,7 @@ public abstract class BaseRemoteProxy { List list = entry.getResponses().get(key); if (list == null) { pollResponse(); - return null; + return entry; } Result res = list.remove(0); diff --git a/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java b/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java index 213b168d9..d242beb77 100644 --- a/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java @@ -110,14 +110,6 @@ public class RedissonBloomFilterTest extends RedisDockerTest { assertThat(redisson.getKeys().count()).isZero(); } - @Test - public void testNotInitializedOnContains() { - Assertions.assertThrows(RedisException.class, () -> { - RBloomFilter filter = redisson.getBloomFilter("filter"); - filter.contains("32"); - }); - } - @Test public void testNotInitializedOnAdd() { Assertions.assertThrows(RedisException.class, () -> { @@ -194,4 +186,17 @@ public class RedissonBloomFilterTest extends RedisDockerTest { assertThat(newFilter.count()).isEqualTo(1); assertThat(newFilter.contains("123")).isTrue(); } + + @Test + public void testContainsException() { + RBloomFilter f1 = redisson.getBloomFilter("filter"); + assertThat(f1.contains("1")).isFalse(); + f1.tryInit(100, 0.03); + + RBloomFilter f2 = redisson.getBloomFilter("filter"); + f2.delete(); + f2.tryInit(200, 0.03); + + assertThat(f1.contains("1")).isFalse(); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 294794ef6..492097914 100644 --- a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Mono; import java.io.IOException; import java.io.NotSerializableException; import java.io.Serializable; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -919,4 +920,18 @@ public class RedissonRemoteServiceTest extends RedisDockerTest { r1.shutdown(); r2.shutdown(); } + + @Test + public void testDelayMethod() throws InterruptedException { + RedissonClient client = createInstance(); + RRemoteService r1 = client.getRemoteService(); + r1.register(RemoteInterface.class, new RemoteImpl()); + + RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.SECONDS); + Assertions.assertThrows(RemoteServiceTimeoutException.class, () -> r1.get(RemoteInterface.class, options).timeoutMethod()); + + RFuture future = r1.get(RemoteInterfaceAsync.class, options).timeoutMethod(); + Thread.sleep(3000); + assertThat(future.isDone()).isEqualTo(true); + } }