Feature - support json.mget
Signed-off-by: seakider <seakider@gmail.com>pull/6051/head
parent
29ee9500aa
commit
dc63c8057a
@ -0,0 +1,147 @@
|
||||
/**
|
||||
* Copyright (c) 2013-2024 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import org.redisson.api.RFuture;
|
||||
import org.redisson.api.RJsonBuckets;
|
||||
import org.redisson.client.protocol.RedisCommand;
|
||||
import org.redisson.client.protocol.RedisCommands;
|
||||
import org.redisson.codec.JsonCodec;
|
||||
import org.redisson.codec.JsonCodecWrapper;
|
||||
import org.redisson.command.CommandAsyncExecutor;
|
||||
import org.redisson.connection.decoder.BucketsDecoder;
|
||||
import org.redisson.connection.decoder.MapGetAllDecoder;
|
||||
import org.redisson.misc.CompletableFutureWrapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RedissonJsonBuckets implements RJsonBuckets {
|
||||
|
||||
protected final JsonCodec codec;
|
||||
protected final CommandAsyncExecutor commandExecutor;
|
||||
|
||||
public RedissonJsonBuckets(JsonCodec codec, CommandAsyncExecutor commandExecutor) {
|
||||
this.codec = codec;
|
||||
this.commandExecutor = commandExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Map<String, V> get(String... keys) {
|
||||
RFuture<Map<String, V>> future = getAsync(keys);
|
||||
return commandExecutor.get(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> RFuture<Map<String, V>> getAsync(String... keys) {
|
||||
return getAsync(codec, ".", keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Map<String, V> get(JsonCodec codec, String path, String... keys) {
|
||||
RFuture<Map<String, V>> future = getAsync(codec, path, keys);
|
||||
return commandExecutor.get(future);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> RFuture<Map<String, V>> getAsync(JsonCodec codec, String path, String... keys) {
|
||||
if (keys.length == 0) {
|
||||
return new CompletableFutureWrapper<>(Collections.emptyMap());
|
||||
}
|
||||
|
||||
List<Object> keysList = Arrays.stream(keys)
|
||||
.map(k -> commandExecutor.getServiceManager().getConfig().getNameMapper().map(k))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
JsonCodecWrapper jsonCodec = new JsonCodecWrapper(codec);
|
||||
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("JSON.MGET", new MapGetAllDecoder(keysList, 0));
|
||||
return commandExecutor.readBatchedAsync(jsonCodec, command, new SlotCallback<Map<Object, Object>, Map<String, V>>() {
|
||||
final Map<String, V> results = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void onSlotResult(List<Object> keys, Map<Object, Object> result) {
|
||||
for (Map.Entry<Object, Object> entry : result.entrySet()) {
|
||||
if (entry.getKey() != null && entry.getValue() != null) {
|
||||
String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) entry.getKey());
|
||||
results.put(key, (V) entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, V> onFinish() {
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedisCommand<Map<Object, Object>> createCommand(List<Object> keys) {
|
||||
return new RedisCommand<>("JSON.MGET", new BucketsDecoder(keys));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] createParams(List<Object> params) {
|
||||
ArrayList<Object> newParams = new ArrayList<>(params);
|
||||
newParams.add(path);
|
||||
return newParams.toArray();
|
||||
}
|
||||
}, keysList.toArray(new Object[0]));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(Map<String, ?> buckets) {
|
||||
commandExecutor.get(setAsync(buckets));
|
||||
}
|
||||
|
||||
@Override
|
||||
public RFuture<Void> setAsync(Map<String, ?> buckets) {
|
||||
return setAsync(codec, ".", buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(JsonCodec codec, String path, Map<String, ?> buckets) {
|
||||
commandExecutor.get(setAsync(codec, path, buckets));
|
||||
}
|
||||
|
||||
@Override
|
||||
public RFuture<Void> setAsync(JsonCodec codec, String path, Map<String, ?> buckets) {
|
||||
if (buckets.isEmpty()) {
|
||||
return new CompletableFutureWrapper<>((Void) null);
|
||||
}
|
||||
|
||||
Map<String, ?> mappedBuckets = buckets.entrySet().stream().collect(
|
||||
Collectors.toMap(e -> commandExecutor.getServiceManager().getConfig().getNameMapper().map(e.getKey()),
|
||||
Map.Entry::getValue));
|
||||
JsonCodecWrapper jsonCodec = new JsonCodecWrapper(codec);
|
||||
return commandExecutor.writeBatchedAsync(jsonCodec, RedisCommands.JSON_MSET, new VoidSlotCallback() {
|
||||
@Override
|
||||
public Object[] createParams(List<Object> keys) {
|
||||
List<Object> params = new ArrayList<>(keys.size());
|
||||
for (Object key : keys) {
|
||||
params.add(key);
|
||||
params.add(path);
|
||||
try {
|
||||
params.add(jsonCodec.getValueEncoder().encode(mappedBuckets.get(key)));
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
return params.toArray();
|
||||
}
|
||||
}, mappedBuckets.keySet().toArray(new Object[0]));
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
/**
|
||||
* Copyright (c) 2013-2024 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.api;
|
||||
|
||||
import org.redisson.codec.JsonCodec;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface RJsonBuckets extends RJsonBucketsAsync {
|
||||
|
||||
/**
|
||||
* Returns Redis json object mapped by key with default path
|
||||
*
|
||||
* @param keys keys
|
||||
* @param <V> type of object with specific json-path
|
||||
* @return Map with name as key and bucket as value
|
||||
*/
|
||||
<V> Map<String, V> get(String... keys);
|
||||
|
||||
/**
|
||||
* Returns Redis json object mapped by key with specific path
|
||||
*
|
||||
* @param codec codec for specific path
|
||||
* @param path json path
|
||||
* @param keys keys
|
||||
* @param <V> type of value at specific json-path
|
||||
* @return Map with name as key and bucket as value
|
||||
*/
|
||||
<V> Map<String, V> get(JsonCodec codec, String path, String... keys);
|
||||
|
||||
/**
|
||||
* Saves json objects with default path mapped by Redis key.
|
||||
*
|
||||
* @param buckets map of json buckets
|
||||
*/
|
||||
void set(Map<String, ?> buckets);
|
||||
|
||||
/**
|
||||
* Saves json objects with specific path mapped by Redis key.
|
||||
*
|
||||
* @param codec codec for specific path
|
||||
* @param path json path
|
||||
* @param buckets map of json buckets
|
||||
*/
|
||||
void set(JsonCodec codec, String path, Map<String, ?> buckets);
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
/**
|
||||
* Copyright (c) 2013-2024 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.api;
|
||||
|
||||
import org.redisson.codec.JsonCodec;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface RJsonBucketsAsync {
|
||||
|
||||
/**
|
||||
* Returns Redis json object mapped by key with default path
|
||||
*
|
||||
* @param keys keys
|
||||
* @param <V> type of object with specific json-path
|
||||
* @return Map with name as key and bucket as value
|
||||
*/
|
||||
<V> RFuture<Map<String, V>> getAsync(String... keys);
|
||||
|
||||
/**
|
||||
* Returns Redis json object mapped by key with specific path
|
||||
*
|
||||
* @param codec codec for specific path
|
||||
* @param path json path
|
||||
* @param keys keys
|
||||
* @param <V> type of value at specific json-path
|
||||
* @return Map with name as key and bucket as value
|
||||
*/
|
||||
<V> RFuture<Map<String, V>> getAsync(JsonCodec codec, String path, String... keys);
|
||||
|
||||
/**
|
||||
* Saves json objects with default path mapped by Redis key.
|
||||
*
|
||||
* @param buckets map of json buckets
|
||||
*/
|
||||
RFuture<Void> setAsync(Map<String, ?> buckets);
|
||||
|
||||
/**
|
||||
* Saves json objects with specific path mapped by Redis key.
|
||||
*
|
||||
* @param codec codec for specific path
|
||||
* @param path json path
|
||||
* @param buckets map of json buckets
|
||||
*/
|
||||
RFuture<Void> setAsync(JsonCodec codec, String path, Map<String, ?> buckets);
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Copyright (c) 2013-2024 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.api;
|
||||
|
||||
import org.redisson.codec.JsonCodec;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface RJsonBucketsReactive {
|
||||
|
||||
/**
|
||||
* Returns Redis json object mapped by key with default path
|
||||
*
|
||||
* @param keys keys
|
||||
* @param <V> type of object with specific json-path
|
||||
* @return Map with name as key and bucket as value
|
||||
*/
|
||||
<V> Mono<Map<String, V>> get(String... keys);
|
||||
|
||||
/**
|
||||
* Returns Redis json object mapped by key with specific path
|
||||
*
|
||||
* @param codec codec for specific path
|
||||
* @param path json path
|
||||
* @param keys keys
|
||||
* @param <V> type of value at specific json-path
|
||||
* @return Map with name as key and bucket as value
|
||||
*/
|
||||
<V> Mono<Map<String, V>> get(JsonCodec codec, String path, String... keys);
|
||||
|
||||
/**
|
||||
* Saves json objects with default path mapped by Redis key.
|
||||
*
|
||||
* @param buckets map of json buckets
|
||||
*/
|
||||
Mono<Void> set(Map<String, ?> buckets);
|
||||
|
||||
/**
|
||||
* Saves json objects with specific path mapped by Redis key.
|
||||
*
|
||||
* @param codec codec for specific path
|
||||
* @param path json path
|
||||
* @param buckets map of json buckets
|
||||
*/
|
||||
Mono<Void> set(JsonCodec codec, String path, Map<String, ?> buckets);
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
/**
|
||||
* Copyright (c) 2013-2024 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.api;
|
||||
|
||||
import io.reactivex.rxjava3.core.Completable;
|
||||
import io.reactivex.rxjava3.core.Single;
|
||||
import org.redisson.codec.JsonCodec;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface RJsonBucketsRx {
|
||||
|
||||
/**
|
||||
* Returns Redis json object mapped by key with default path
|
||||
*
|
||||
* @param keys keys
|
||||
* @param <V> type of object with specific json-path
|
||||
* @return Map with name as key and bucket as value
|
||||
*/
|
||||
<V> Single<Map<String, V>> get(String... keys);
|
||||
|
||||
/**
|
||||
* Returns Redis json object mapped by key with specific path
|
||||
*
|
||||
* @param codec codec for specific path
|
||||
* @param path json path
|
||||
* @param keys keys
|
||||
* @param <V> type of value at specific json-path
|
||||
* @return Map with name as key and bucket as value
|
||||
*/
|
||||
<V> Single<Map<String, V>> get(JsonCodec codec, String path, String... keys);
|
||||
|
||||
/**
|
||||
* Saves json objects with default path mapped by Redis key.
|
||||
*
|
||||
* @param buckets map of json buckets
|
||||
*/
|
||||
Completable set(Map<String, ?> buckets);
|
||||
|
||||
/**
|
||||
* Saves json objects with specific path mapped by Redis key.
|
||||
*
|
||||
* @param codec codec for specific path
|
||||
* @param path json path
|
||||
* @param buckets map of json buckets
|
||||
*/
|
||||
Completable set(JsonCodec codec, String path, Map<String, ?> buckets);
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
package org.redisson;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.redisson.api.RJsonBuckets;
|
||||
import org.redisson.client.codec.IntegerCodec;
|
||||
import org.redisson.codec.JacksonCodec;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.redisson.RedissonJsonBucketTest.NestedType;
|
||||
import static org.redisson.RedissonJsonBucketTest.TestType;
|
||||
|
||||
public class RedissonJsonBucketsTest extends DockerRedisStackTest {
|
||||
|
||||
@Test
|
||||
public void testSetAndGet() {
|
||||
RJsonBuckets buckets = redisson.getJsonBuckets(new JacksonCodec<>(TestType.class));
|
||||
Map<String, TestType> map = new HashMap<>();
|
||||
IntStream.range(0, 1000).forEach(i -> {
|
||||
TestType testType = new TestType();
|
||||
testType.setName("name" + i);
|
||||
map.put(testType.getName(), testType);
|
||||
});
|
||||
buckets.set(map);
|
||||
|
||||
Map<String, TestType> stringObjectMap = buckets.get(map.keySet().toArray(new String[]{}));
|
||||
assertThat(stringObjectMap.size()).isEqualTo(1000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWithPath() {
|
||||
RJsonBuckets buckets = redisson.getJsonBuckets(new JacksonCodec<>(TestType.class));
|
||||
Map<String, TestType> map = new HashMap<>();
|
||||
IntStream.range(0, 1000).forEach(i -> {
|
||||
TestType testType = new TestType();
|
||||
testType.setName("name" + i);
|
||||
NestedType nestedType = new NestedType();
|
||||
nestedType.setValue(i);
|
||||
testType.setType(nestedType);
|
||||
map.put(testType.getName(), testType);
|
||||
});
|
||||
buckets.set(map);
|
||||
|
||||
Map<String, List<Integer>> result = buckets.get(new JacksonCodec<>(new TypeReference<List<Integer>>() {}), "$.type.value", map.keySet().toArray(new String[]{}));
|
||||
assertThat(result.size()).isEqualTo(1000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetWithPath() {
|
||||
RJsonBuckets buckets = redisson.getJsonBuckets(new JacksonCodec<>(TestType.class));
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
IntStream.range(0, 1000).forEach(i -> {
|
||||
TestType testType = new TestType();
|
||||
testType.setName("name" + i);
|
||||
NestedType nestedType = new NestedType();
|
||||
nestedType.setValue(i);
|
||||
testType.setType(nestedType);
|
||||
map.put(testType.getName(), testType);
|
||||
});
|
||||
buckets.set(map);
|
||||
|
||||
map.clear();
|
||||
IntStream.range(0, 1000).forEach(i -> {
|
||||
map.put("name" + i, i + 1);
|
||||
});
|
||||
buckets.set(new IntegerCodec(), "$.type.value", map);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue