Feature - RedissonKeys implements UnlinkByPattern and unlinkByPatternAsync methods

Signed-off-by: seakider <seakider@gmail.com>
pull/5949/head
seakider 8 months ago
parent 83c536ea64
commit db8324f5ea

@ -48,6 +48,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@ -235,6 +236,29 @@ public class RedissonKeys implements RKeys {
@Override
public RFuture<Long> deleteByPatternAsync(String pattern) {
return eraseByPatternAsync(false, pattern);
}
@Override
public long unlinkByPattern(String pattern) {
return commandExecutor.get(unlinkByPatternAsync(pattern));
}
@Override
public RFuture<Long> unlinkByPatternAsync(String pattern) {
return eraseByPatternAsync(true, pattern);
}
private RFuture<Long> eraseByPatternAsync(boolean unlinkMode, String pattern) {
String commandName;
Function<String[], Long> delegate;
if (unlinkMode) {
commandName = RedisCommands.UNLINK.getName();
delegate = this::unlink;
} else {
commandName = RedisCommands.DEL.getName();
delegate = this::delete;
}
if (commandExecutor instanceof CommandBatchService
|| commandExecutor instanceof CommandReactiveBatchService
|| commandExecutor instanceof CommandRxBatchService) {
@ -246,9 +270,9 @@ public class RedissonKeys implements RKeys {
"local keys = redis.call('keys', ARGV[1]) "
+ "local n = 0 "
+ "for i=1, #keys,5000 do "
+ "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
+ "n = n + redis.call(ARGV[2], unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
+ "end "
+ "return n;", Collections.emptyList(), pattern);
+ "return n;", Collections.emptyList(), pattern, commandName);
}
int batchSize = 500;
@ -266,13 +290,13 @@ public class RedissonKeys implements RKeys {
keys.add(key);
if (keys.size() % batchSize == 0) {
count += delete(keys.toArray(new String[0]));
count += delegate.apply(keys.toArray(new String[0]));
keys.clear();
}
}
if (!keys.isEmpty()) {
count += delete(keys.toArray(new String[0]));
count += delegate.apply(keys.toArray(new String[0]));
keys.clear();
}

@ -304,6 +304,21 @@ public interface RKeys extends RKeysAsync {
*/
long deleteByPattern(String pattern);
/**
* Unlink multiple objects by a key pattern.
* <p>
* Method executes in <b>NON atomic way</b> in cluster mode due to lua script limitations.
* <p>
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @return number of removed keys
*/
long unlinkByPattern(String pattern);
/**
* Delete multiple objects
*

@ -172,6 +172,21 @@ public interface RKeysAsync {
*/
RFuture<Long> deleteByPatternAsync(String pattern);
/**
* Unlink multiple objects by a key pattern.
* <p>
* Method executes in <b>NON atomic way</b> in cluster mode due to lua script limitations.
* <p>
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @return number of removed keys
*/
RFuture<Long> unlinkByPatternAsync(String pattern);
/**
* Delete multiple objects
*

@ -3,6 +3,8 @@ package org.redisson;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.redisson.api.*;
import org.redisson.api.listener.FlushListener;
import org.redisson.api.listener.NewObjectListener;
@ -292,8 +294,9 @@ public class RedissonKeysTest extends RedisDockerTest {
});
}
@Test
public void testDeleteByPattern() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDeleteByPattern(boolean unlinkMode) {
RBucket<String> bucket = redisson.getBucket("test0");
bucket.set("someValue3");
assertThat(bucket.isExists()).isTrue();
@ -310,13 +313,20 @@ public class RedissonKeysTest extends RedisDockerTest {
map2.fastPut("1", "5");
assertThat(map2.isExists()).isTrue();
assertThat(redisson.getKeys().deleteByPattern("test?")).isEqualTo(4);
assertThat(redisson.getKeys().deleteByPattern("test?")).isZero();
if(unlinkMode) {
assertThat(redisson.getKeys().unlinkByPattern("test?")).isEqualTo(4);
assertThat(redisson.getKeys().unlinkByPattern("test?")).isZero();
} else {
assertThat(redisson.getKeys().deleteByPattern("test?")).isEqualTo(4);
assertThat(redisson.getKeys().deleteByPattern("test?")).isZero();
}
assertThat(redisson.getKeys().count()).isZero();
}
@Test
public void testDeleteByPatternBatch() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDeleteByPatternBatch(boolean unlinkMode) {
RBucket<String> bucket = redisson.getBucket("test0");
bucket.set("someValue3");
assertThat(bucket.isExists()).isTrue();
@ -335,7 +345,11 @@ public class RedissonKeysTest extends RedisDockerTest {
RBatch batch = redisson.createBatch();
batch.getKeys().deleteByPatternAsync("test?");
if(unlinkMode) {
batch.getKeys().unlinkByPatternAsync("test?");
} else {
batch.getKeys().deleteByPatternAsync("test?");
}
BatchResult<?> r = batch.execute();
Assertions.assertEquals(4L, r.getResponses().get(0));
}

Loading…
Cancel
Save