From dc63c8057a59f60eaf04382d3d980b8c40a37155 Mon Sep 17 00:00:00 2001 From: seakider Date: Mon, 22 Jul 2024 23:41:08 +0800 Subject: [PATCH] Feature - support json.mget Signed-off-by: seakider --- .../src/main/java/org/redisson/Redisson.java | 7 +- .../org/redisson/RedissonJsonBuckets.java | 147 ++++++++++++++++++ .../java/org/redisson/RedissonReactive.java | 7 +- .../main/java/org/redisson/RedissonRx.java | 7 +- .../java/org/redisson/api/RJsonBuckets.java | 59 +++++++ .../org/redisson/api/RJsonBucketsAsync.java | 59 +++++++ .../redisson/api/RJsonBucketsReactive.java | 60 +++++++ .../java/org/redisson/api/RJsonBucketsRx.java | 61 ++++++++ .../java/org/redisson/api/RedissonClient.java | 11 +- .../redisson/api/RedissonReactiveClient.java | 11 +- .../org/redisson/api/RedissonRxClient.java | 11 +- .../org/redisson/RedissonJsonBucketsTest.java | 73 +++++++++ 12 files changed, 507 insertions(+), 6 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/RedissonJsonBuckets.java create mode 100644 redisson/src/main/java/org/redisson/api/RJsonBuckets.java create mode 100644 redisson/src/main/java/org/redisson/api/RJsonBucketsAsync.java create mode 100644 redisson/src/main/java/org/redisson/api/RJsonBucketsReactive.java create mode 100644 redisson/src/main/java/org/redisson/api/RJsonBucketsRx.java create mode 100644 redisson/src/test/java/org/redisson/RedissonJsonBucketsTest.java diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index a65ee9b36..37c39e32f 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -292,7 +292,12 @@ public final class Redisson implements RedissonClient { JsonBucketParams params = (JsonBucketParams) options; return new RedissonJsonBucket<>(params.getCodec(), commandExecutor, params.getName()); } - + + @Override + public RJsonBuckets getJsonBuckets(JsonCodec codec) { + return new RedissonJsonBuckets(codec, commandExecutor); + } + @Override public RHyperLogLog getHyperLogLog(String name) { return new RedissonHyperLogLog(commandExecutor, name); diff --git a/redisson/src/main/java/org/redisson/RedissonJsonBuckets.java b/redisson/src/main/java/org/redisson/RedissonJsonBuckets.java new file mode 100644 index 000000000..324ec8823 --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonJsonBuckets.java @@ -0,0 +1,147 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import org.redisson.api.RFuture; +import org.redisson.api.RJsonBuckets; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.JsonCodec; +import org.redisson.codec.JsonCodecWrapper; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.decoder.BucketsDecoder; +import org.redisson.connection.decoder.MapGetAllDecoder; +import org.redisson.misc.CompletableFutureWrapper; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class RedissonJsonBuckets implements RJsonBuckets { + + protected final JsonCodec codec; + protected final CommandAsyncExecutor commandExecutor; + + public RedissonJsonBuckets(JsonCodec codec, CommandAsyncExecutor commandExecutor) { + this.codec = codec; + this.commandExecutor = commandExecutor; + } + + @Override + public Map get(String... keys) { + RFuture> future = getAsync(keys); + return commandExecutor.get(future); + } + + @Override + public RFuture> getAsync(String... keys) { + return getAsync(codec, ".", keys); + } + + @Override + public Map get(JsonCodec codec, String path, String... keys) { + RFuture> future = getAsync(codec, path, keys); + return commandExecutor.get(future); + } + + @Override + public RFuture> getAsync(JsonCodec codec, String path, String... keys) { + if (keys.length == 0) { + return new CompletableFutureWrapper<>(Collections.emptyMap()); + } + + List keysList = Arrays.stream(keys) + .map(k -> commandExecutor.getServiceManager().getConfig().getNameMapper().map(k)) + .collect(Collectors.toList()); + + JsonCodecWrapper jsonCodec = new JsonCodecWrapper(codec); + RedisCommand> command = new RedisCommand>("JSON.MGET", new MapGetAllDecoder(keysList, 0)); + return commandExecutor.readBatchedAsync(jsonCodec, command, new SlotCallback, Map>() { + final Map results = new ConcurrentHashMap<>(); + + @Override + public void onSlotResult(List keys, Map result) { + for (Map.Entry entry : result.entrySet()) { + if (entry.getKey() != null && entry.getValue() != null) { + String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) entry.getKey()); + results.put(key, (V) entry.getValue()); + } + } + } + + @Override + public Map onFinish() { + return results; + } + + @Override + public RedisCommand> createCommand(List keys) { + return new RedisCommand<>("JSON.MGET", new BucketsDecoder(keys)); + } + + @Override + public Object[] createParams(List params) { + ArrayList newParams = new ArrayList<>(params); + newParams.add(path); + return newParams.toArray(); + } + }, keysList.toArray(new Object[0])); + } + + @Override + public void set(Map buckets) { + commandExecutor.get(setAsync(buckets)); + } + + @Override + public RFuture setAsync(Map buckets) { + return setAsync(codec, ".", buckets); + } + + @Override + public void set(JsonCodec codec, String path, Map buckets) { + commandExecutor.get(setAsync(codec, path, buckets)); + } + + @Override + public RFuture setAsync(JsonCodec codec, String path, Map buckets) { + if (buckets.isEmpty()) { + return new CompletableFutureWrapper<>((Void) null); + } + + Map mappedBuckets = buckets.entrySet().stream().collect( + Collectors.toMap(e -> commandExecutor.getServiceManager().getConfig().getNameMapper().map(e.getKey()), + Map.Entry::getValue)); + JsonCodecWrapper jsonCodec = new JsonCodecWrapper(codec); + return commandExecutor.writeBatchedAsync(jsonCodec, RedisCommands.JSON_MSET, new VoidSlotCallback() { + @Override + public Object[] createParams(List keys) { + List params = new ArrayList<>(keys.size()); + for (Object key : keys) { + params.add(key); + params.add(path); + try { + params.add(jsonCodec.getValueEncoder().encode(mappedBuckets.get(key))); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + return params.toArray(); + } + }, mappedBuckets.keySet().toArray(new Object[0])); + } +} \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 2397f5965..ea5792be1 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -433,7 +433,12 @@ public class RedissonReactive implements RedissonReactiveClient { return ReactiveProxyBuilder.create(commandExecutor, new RedissonJsonBucket(params.getCodec(), ca, params.getName()), RJsonBucketReactive.class); } - + + @Override + public RJsonBucketsReactive getJsonBuckets(JsonCodec codec) { + return ReactiveProxyBuilder.create(commandExecutor, new RedissonJsonBuckets(codec, commandExecutor), RJsonBucketsReactive.class); + } + @Override public RHyperLogLogReactive getHyperLogLog(String name) { return ReactiveProxyBuilder.create(commandExecutor, new RedissonHyperLogLog(commandExecutor, name), RHyperLogLogReactive.class); diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 210f7ab09..da1014608 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -367,7 +367,12 @@ public class RedissonRx implements RedissonRxClient { CommandRxExecutor ce = commandExecutor.copy(params); return RxProxyBuilder.create(commandExecutor, new RedissonJsonBucket<>(params.getCodec(), ce, params.getName()), RJsonBucketRx.class); } - + + @Override + public RJsonBucketsRx getJsonBuckets(JsonCodec codec) { + return RxProxyBuilder.create(commandExecutor, new RedissonJsonBuckets(codec, commandExecutor), RJsonBucketsRx.class); + } + @Override public RHyperLogLogRx getHyperLogLog(String name) { return RxProxyBuilder.create(commandExecutor, new RedissonHyperLogLog(commandExecutor, name), RHyperLogLogRx.class); diff --git a/redisson/src/main/java/org/redisson/api/RJsonBuckets.java b/redisson/src/main/java/org/redisson/api/RJsonBuckets.java new file mode 100644 index 000000000..48b9e9e6b --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RJsonBuckets.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.redisson.codec.JsonCodec; + +import java.util.Map; + +public interface RJsonBuckets extends RJsonBucketsAsync { + + /** + * Returns Redis json object mapped by key with default path + * + * @param keys keys + * @param type of object with specific json-path + * @return Map with name as key and bucket as value + */ + Map get(String... keys); + + /** + * Returns Redis json object mapped by key with specific path + * + * @param codec codec for specific path + * @param path json path + * @param keys keys + * @param type of value at specific json-path + * @return Map with name as key and bucket as value + */ + Map get(JsonCodec codec, String path, String... keys); + + /** + * Saves json objects with default path mapped by Redis key. + * + * @param buckets map of json buckets + */ + void set(Map buckets); + + /** + * Saves json objects with specific path mapped by Redis key. + * + * @param codec codec for specific path + * @param path json path + * @param buckets map of json buckets + */ + void set(JsonCodec codec, String path, Map buckets); +} diff --git a/redisson/src/main/java/org/redisson/api/RJsonBucketsAsync.java b/redisson/src/main/java/org/redisson/api/RJsonBucketsAsync.java new file mode 100644 index 000000000..a70d4bfc6 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RJsonBucketsAsync.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.redisson.codec.JsonCodec; + +import java.util.Map; + +public interface RJsonBucketsAsync { + + /** + * Returns Redis json object mapped by key with default path + * + * @param keys keys + * @param type of object with specific json-path + * @return Map with name as key and bucket as value + */ + RFuture> getAsync(String... keys); + + /** + * Returns Redis json object mapped by key with specific path + * + * @param codec codec for specific path + * @param path json path + * @param keys keys + * @param type of value at specific json-path + * @return Map with name as key and bucket as value + */ + RFuture> getAsync(JsonCodec codec, String path, String... keys); + + /** + * Saves json objects with default path mapped by Redis key. + * + * @param buckets map of json buckets + */ + RFuture setAsync(Map buckets); + + /** + * Saves json objects with specific path mapped by Redis key. + * + * @param codec codec for specific path + * @param path json path + * @param buckets map of json buckets + */ + RFuture setAsync(JsonCodec codec, String path, Map buckets); +} diff --git a/redisson/src/main/java/org/redisson/api/RJsonBucketsReactive.java b/redisson/src/main/java/org/redisson/api/RJsonBucketsReactive.java new file mode 100644 index 000000000..86ac3470d --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RJsonBucketsReactive.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.redisson.codec.JsonCodec; +import reactor.core.publisher.Mono; + +import java.util.Map; + +public interface RJsonBucketsReactive { + + /** + * Returns Redis json object mapped by key with default path + * + * @param keys keys + * @param type of object with specific json-path + * @return Map with name as key and bucket as value + */ + Mono> get(String... keys); + + /** + * Returns Redis json object mapped by key with specific path + * + * @param codec codec for specific path + * @param path json path + * @param keys keys + * @param type of value at specific json-path + * @return Map with name as key and bucket as value + */ + Mono> get(JsonCodec codec, String path, String... keys); + + /** + * Saves json objects with default path mapped by Redis key. + * + * @param buckets map of json buckets + */ + Mono set(Map buckets); + + /** + * Saves json objects with specific path mapped by Redis key. + * + * @param codec codec for specific path + * @param path json path + * @param buckets map of json buckets + */ + Mono set(JsonCodec codec, String path, Map buckets); +} diff --git a/redisson/src/main/java/org/redisson/api/RJsonBucketsRx.java b/redisson/src/main/java/org/redisson/api/RJsonBucketsRx.java new file mode 100644 index 000000000..b288a0e3e --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RJsonBucketsRx.java @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Single; +import org.redisson.codec.JsonCodec; + +import java.util.Map; + +public interface RJsonBucketsRx { + + /** + * Returns Redis json object mapped by key with default path + * + * @param keys keys + * @param type of object with specific json-path + * @return Map with name as key and bucket as value + */ + Single> get(String... keys); + + /** + * Returns Redis json object mapped by key with specific path + * + * @param codec codec for specific path + * @param path json path + * @param keys keys + * @param type of value at specific json-path + * @return Map with name as key and bucket as value + */ + Single> get(JsonCodec codec, String path, String... keys); + + /** + * Saves json objects with default path mapped by Redis key. + * + * @param buckets map of json buckets + */ + Completable set(Map buckets); + + /** + * Saves json objects with specific path mapped by Redis key. + * + * @param codec codec for specific path + * @param path json path + * @param buckets map of json buckets + */ + Completable set(JsonCodec codec, String path, Map buckets); +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 92b8ec63a..8b9a3d77d 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -371,7 +371,16 @@ public interface RedissonClient { * @return JsonBucket object */ RJsonBucket getJsonBucket(JsonBucketOptions options); - + + /** + * Returns API for mass operations over JsonBucket objects + * using provided codec for JSON object with default path. + * + * @param codec using provided codec for JSON object with default path. + * @return JsonBuckets + */ + RJsonBuckets getJsonBuckets(JsonCodec codec); + /** * Returns HyperLogLog instance by name. * diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index e04e6f0b7..2aabde9e0 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -602,7 +602,16 @@ public interface RedissonReactiveClient { * @return JsonBucket object */ RJsonBucketReactive getJsonBucket(JsonBucketOptions options); - + + /** + * Returns API for mass operations over JsonBucket objects + * using provided codec for JSON object with default path. + * + * @param codec using provided codec for JSON object with default path. + * @return JsonBuckets + */ + RJsonBucketsReactive getJsonBuckets(JsonCodec codec); + /** * Returns HyperLogLog instance by name. * diff --git a/redisson/src/main/java/org/redisson/api/RedissonRxClient.java b/redisson/src/main/java/org/redisson/api/RedissonRxClient.java index 8f7cd33bb..8a199bcfb 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonRxClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonRxClient.java @@ -591,7 +591,16 @@ public interface RedissonRxClient { * @return JsonBucket object */ RJsonBucketRx getJsonBucket(JsonBucketOptions options); - + + /** + * Returns API for mass operations over JsonBucket objects + * using provided codec for JSON object with default path. + * + * @param codec using provided codec for JSON object with default path. + * @return JsonBuckets + */ + RJsonBucketsRx getJsonBuckets(JsonCodec codec); + /** * Returns HyperLogLog instance by name. * diff --git a/redisson/src/test/java/org/redisson/RedissonJsonBucketsTest.java b/redisson/src/test/java/org/redisson/RedissonJsonBucketsTest.java new file mode 100644 index 000000000..f69f3a3b7 --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonJsonBucketsTest.java @@ -0,0 +1,73 @@ +package org.redisson; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.junit.jupiter.api.Test; +import org.redisson.api.RJsonBuckets; +import org.redisson.client.codec.IntegerCodec; +import org.redisson.codec.JacksonCodec; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.redisson.RedissonJsonBucketTest.NestedType; +import static org.redisson.RedissonJsonBucketTest.TestType; + +public class RedissonJsonBucketsTest extends DockerRedisStackTest { + + @Test + public void testSetAndGet() { + RJsonBuckets buckets = redisson.getJsonBuckets(new JacksonCodec<>(TestType.class)); + Map map = new HashMap<>(); + IntStream.range(0, 1000).forEach(i -> { + TestType testType = new TestType(); + testType.setName("name" + i); + map.put(testType.getName(), testType); + }); + buckets.set(map); + + Map stringObjectMap = buckets.get(map.keySet().toArray(new String[]{})); + assertThat(stringObjectMap.size()).isEqualTo(1000); + } + + @Test + public void testGetWithPath() { + RJsonBuckets buckets = redisson.getJsonBuckets(new JacksonCodec<>(TestType.class)); + Map map = new HashMap<>(); + IntStream.range(0, 1000).forEach(i -> { + TestType testType = new TestType(); + testType.setName("name" + i); + NestedType nestedType = new NestedType(); + nestedType.setValue(i); + testType.setType(nestedType); + map.put(testType.getName(), testType); + }); + buckets.set(map); + + Map> result = buckets.get(new JacksonCodec<>(new TypeReference>() {}), "$.type.value", map.keySet().toArray(new String[]{})); + assertThat(result.size()).isEqualTo(1000); + } + + @Test + public void testSetWithPath() { + RJsonBuckets buckets = redisson.getJsonBuckets(new JacksonCodec<>(TestType.class)); + Map map = new HashMap<>(); + IntStream.range(0, 1000).forEach(i -> { + TestType testType = new TestType(); + testType.setName("name" + i); + NestedType nestedType = new NestedType(); + nestedType.setValue(i); + testType.setType(nestedType); + map.put(testType.getName(), testType); + }); + buckets.set(map); + + map.clear(); + IntStream.range(0, 1000).forEach(i -> { + map.put("name" + i, i + 1); + }); + buckets.set(new IntegerCodec(), "$.type.value", map); + } +}