Merge branch 'master' of github.com:redisson/redisson

pull/6370/head
Nikita Koksharov 1 month ago
commit d19f1df9e6

@ -171,12 +171,20 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override
public RFuture<Long> containsAsync(Collection<T> objects) {
CompletionStage<Void> future = CompletableFuture.completedFuture(null);
CompletionStage<Long> f = CompletableFuture.completedFuture(null);
if (size == 0) {
future = readConfigAsync();
f = readConfigAsync().handle((r, e) -> {
if (e instanceof IllegalArgumentException) {
return 0L;
}
return null;
});
}
CompletionStage<Long> f = future.thenCompose(r -> {
f = f.thenCompose(r -> {
if (r != null) {
return CompletableFuture.completedFuture(r);
}
List<Long> allIndexes = index(objects);
List<Object> params = new ArrayList<>();
@ -188,7 +196,9 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" +
"if size ~= ARGV[1] or hashIterations ~= ARGV[2] then " +
"return 0;" +
"end;" +
"local k = 0;" +
"local c = 0;" +

@ -260,7 +260,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == expiredTaskIds[i] then "
+ "if v[1] == scheduledName then "
+ "redis.call('publish', KEYS[3], startTime); "
+ "end;"

@ -132,12 +132,13 @@ public class TasksService extends BaseRemoteService {
+ "end; "
+ "if tonumber(ARGV[1]) > 0 then "
+ "local scheduledName = 'ff:' .. ARGV[2];"
+ "redis.call('set', KEYS[7], ARGV[4]);"
+ "redis.call('zadd', KEYS[3], ARGV[1], 'ff:' .. ARGV[2]);"
+ "redis.call('zadd', KEYS[3], ARGV[1], scheduledName);"
+ "local v = redis.call('zrange', KEYS[3], 0, 0); "
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "if v[1] == ARGV[2] then "
+ "if v[1] == scheduledName then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end; "
+ "end;"

@ -201,7 +201,7 @@ public abstract class BaseRemoteProxy {
List<Result> list = entry.getResponses().get(key);
if (list == null) {
pollResponse();
return null;
return entry;
}
Result res = list.remove(0);

@ -110,14 +110,6 @@ public class RedissonBloomFilterTest extends RedisDockerTest {
assertThat(redisson.getKeys().count()).isZero();
}
@Test
public void testNotInitializedOnContains() {
Assertions.assertThrows(RedisException.class, () -> {
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
filter.contains("32");
});
}
@Test
public void testNotInitializedOnAdd() {
Assertions.assertThrows(RedisException.class, () -> {
@ -194,4 +186,17 @@ public class RedissonBloomFilterTest extends RedisDockerTest {
assertThat(newFilter.count()).isEqualTo(1);
assertThat(newFilter.contains("123")).isTrue();
}
@Test
public void testContainsException() {
RBloomFilter<String> f1 = redisson.getBloomFilter("filter");
assertThat(f1.contains("1")).isFalse();
f1.tryInit(100, 0.03);
RBloomFilter<String> f2 = redisson.getBloomFilter("filter");
f2.delete();
f2.tryInit(200, 0.03);
assertThat(f1.contains("1")).isFalse();
}
}

@ -17,6 +17,7 @@ import reactor.core.publisher.Mono;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -919,4 +920,18 @@ public class RedissonRemoteServiceTest extends RedisDockerTest {
r1.shutdown();
r2.shutdown();
}
@Test
public void testDelayMethod() throws InterruptedException {
RedissonClient client = createInstance();
RRemoteService r1 = client.getRemoteService();
r1.register(RemoteInterface.class, new RemoteImpl());
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.SECONDS);
Assertions.assertThrows(RemoteServiceTimeoutException.class, () -> r1.get(RemoteInterface.class, options).timeoutMethod());
RFuture<Void> future = r1.get(RemoteInterfaceAsync.class, options).timeoutMethod();
Thread.sleep(3000);
assertThat(future.isDone()).isEqualTo(true);
}
}

Loading…
Cancel
Save