Fixed - Old value of RLiveObject's field isn't removed from index #3044

pull/3048/head
Nikita Koksharov 4 years ago
parent 6d70da7cc6
commit ce92eca96b

@ -610,28 +610,22 @@ public class RedissonLiveObjectService implements RLiveObjectService {
@Override @Override
public <K> Iterable<K> findIds(Class<?> entityClass, int count) { public <K> Iterable<K> findIds(Class<?> entityClass, int count) {
try { NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(entityClass);
String idFieldName = getRIdFieldName(entityClass); String pattern = namingScheme.getNamePattern(entityClass);
Class<?> idFieldType = ClassUtils.getDeclaredField(entityClass, idFieldName).getType(); RedissonKeys keys = new RedissonKeys(connectionManager.getCommandExecutor());
NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(entityClass);
String pattern = namingScheme.getNamePattern(entityClass, idFieldType, idFieldName); RedisCommand<ListScanResult<String>> command = new RedisCommand<>("SCAN",
RedissonKeys keys = new RedissonKeys(connectionManager.getCommandExecutor()); new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<Object>()), new Convertor<Object>() {
@Override
RedisCommand<ListScanResult<String>> command = new RedisCommand<>("SCAN", public Object convert(Object obj) {
new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<Object>()), new Convertor<Object>() { if (!(obj instanceof String)) {
@Override return obj;
public Object convert(Object obj) {
if (!(obj instanceof String)) {
return obj;
}
return namingScheme.resolveId(obj.toString());
} }
}); return namingScheme.resolveId(obj.toString());
}
});
return keys.getKeysByPattern(command, pattern, 0, count); return keys.getKeysByPattern(command, pattern, 0, count);
} catch (NoSuchFieldException e) {
throw new IllegalStateException(e);
}
} }
@Override @Override

@ -15,24 +15,8 @@
*/ */
package org.redisson; package org.redisson;
import java.io.IOException; import io.netty.buffer.ByteBuf;
import java.util.ArrayList; import org.redisson.api.*;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.redisson.api.DeletedObjectListener;
import org.redisson.api.ExpiredObjectListener;
import org.redisson.api.ObjectListener;
import org.redisson.api.RFuture;
import org.redisson.api.RObject;
import org.redisson.api.RPatternTopic;
import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
@ -43,7 +27,11 @@ import org.redisson.misc.Hash;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf; import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/** /**
* Base Redisson object * Base Redisson object
@ -295,48 +283,15 @@ public abstract class RedissonObject implements RObject {
} }
public ByteBuf encode(Object value) { public ByteBuf encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) { return commandExecutor.encode(codec, value);
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
} }
public ByteBuf encodeMapKey(Object value) { public ByteBuf encodeMapKey(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) { return commandExecutor.encodeMapKey(codec, value);
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getMapKeyEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
} }
public ByteBuf encodeMapValue(Object value) { public ByteBuf encodeMapValue(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) { return commandExecutor.encodeMapValue(codec, value);
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getMapValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
} }
@Override @Override

@ -15,7 +15,6 @@
*/ */
package org.redisson; package org.redisson;
import io.netty.buffer.ByteBuf;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RScript; import org.redisson.api.RScript;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -24,7 +23,6 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import java.io.IOException;
import java.util.*; import java.util.*;
/** /**
@ -195,26 +193,11 @@ public class RedissonScript implements RScript {
private List<Object> encode(Collection<?> values, Codec codec) { private List<Object> encode(Collection<?> values, Codec codec) {
List<Object> result = new ArrayList<Object>(values.size()); List<Object> result = new ArrayList<Object>(values.size());
for (Object object : values) { for (Object object : values) {
result.add(encode(object, codec)); result.add(commandExecutor.encode(codec, object));
} }
return result; return result;
} }
private ByteBuf encode(Object value, Codec codec) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override @Override
public <R> RFuture<R> evalShaAsync(String key, Mode mode, String shaDigest, ReturnType returnType, public <R> RFuture<R> evalShaAsync(String key, Mode mode, String shaDigest, ReturnType returnType,
List<Object> keys, Object... values) { List<Object> keys, Object... values) {

@ -15,10 +15,6 @@
*/ */
package org.redisson; package org.redisson;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
@ -39,7 +35,8 @@ import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService; import org.redisson.pubsub.PublishSubscribeService;
import io.netty.buffer.ByteBuf; import java.util.Collections;
import java.util.List;
/** /**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
@ -66,7 +63,7 @@ public class RedissonTopic implements RTopic {
this.codec = codec; this.codec = codec;
this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService();
} }
@Override @Override
public List<String> getChannelNames() { public List<String> getChannelNames() {
return Collections.singletonList(name); return Collections.singletonList(name);
@ -79,24 +76,9 @@ public class RedissonTopic implements RTopic {
@Override @Override
public RFuture<Long> publishAsync(Object message) { public RFuture<Long> publishAsync(Object message) {
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PUBLISH, name, encode(message)); return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PUBLISH, name, commandExecutor.encode(codec, message));
} }
protected ByteBuf encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override @Override
public int addListener(StatusListener listener) { public int addListener(StatusListener listener) {
return addListener(new PubSubStatusListener(listener, name)); return addListener(new PubSubStatusListener(listener, name));
@ -107,13 +89,13 @@ public class RedissonTopic implements RTopic {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(type, (MessageListener<M>) listener, name); PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(type, (MessageListener<M>) listener, name);
return addListener(pubSubListener); return addListener(pubSubListener);
} }
@Override @Override
public RFuture<Integer> addListenerAsync(StatusListener listener) { public RFuture<Integer> addListenerAsync(StatusListener listener) {
PubSubStatusListener pubSubListener = new PubSubStatusListener(listener, name); PubSubStatusListener pubSubListener = new PubSubStatusListener(listener, name);
return addListenerAsync((RedisPubSubListener<?>) pubSubListener); return addListenerAsync((RedisPubSubListener<?>) pubSubListener);
} }
@Override @Override
public <M> RFuture<Integer> addListenerAsync(Class<M> type, MessageListener<M> listener) { public <M> RFuture<Integer> addListenerAsync(Class<M> type, MessageListener<M> listener) {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(type, listener, name); PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(type, listener, name);
@ -125,7 +107,7 @@ public class RedissonTopic implements RTopic {
commandExecutor.syncSubscription(future); commandExecutor.syncSubscription(future);
return System.identityHashCode(pubSubListener); return System.identityHashCode(pubSubListener);
} }
private RFuture<Integer> addListenerAsync(RedisPubSubListener<?> pubSubListener) { private RFuture<Integer> addListenerAsync(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, channelName, pubSubListener); RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, channelName, pubSubListener);
RPromise<Integer> result = new RedissonPromise<Integer>(); RPromise<Integer> result = new RedissonPromise<Integer>();
@ -134,7 +116,7 @@ public class RedissonTopic implements RTopic {
result.tryFailure(e); result.tryFailure(e);
return; return;
} }
result.trySuccess(System.identityHashCode(pubSubListener)); result.trySuccess(System.identityHashCode(pubSubListener));
}); });
return result; return result;
@ -144,7 +126,7 @@ public class RedissonTopic implements RTopic {
public void removeAllListeners() { public void removeAllListeners() {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore); acquire(semaphore);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) { if (entry == null) {
semaphore.release(); semaphore.release();
@ -165,12 +147,12 @@ public class RedissonTopic implements RTopic {
throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic"); throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic");
} }
} }
@Override @Override
public void removeListener(MessageListener<?> listener) { public void removeListener(MessageListener<?> listener) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore); acquire(semaphore);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) { if (entry == null) {
semaphore.release(); semaphore.release();
@ -185,7 +167,7 @@ public class RedissonTopic implements RTopic {
} }
} }
@Override @Override
public RFuture<Void> removeListenerAsync(MessageListener<?> listener) { public RFuture<Void> removeListenerAsync(MessageListener<?> listener) {
RPromise<Void> promise = new RedissonPromise<Void>(); RPromise<Void> promise = new RedissonPromise<Void>();
@ -197,7 +179,7 @@ public class RedissonTopic implements RTopic {
promise.trySuccess(null); promise.trySuccess(null);
return; return;
} }
entry.removeListener(channelName, listener); entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) { if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore) subscribeService.unsubscribe(channelName, semaphore)
@ -235,12 +217,12 @@ public class RedissonTopic implements RTopic {
}); });
return promise; return promise;
} }
@Override @Override
public void removeListener(Integer... listenerIds) { public void removeListener(Integer... listenerIds) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore); acquire(semaphore);
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) { if (entry == null) {
semaphore.release(); semaphore.release();

@ -15,9 +15,7 @@
*/ */
package org.redisson.command; package org.redisson.command;
import java.util.Collection; import io.netty.buffer.ByteBuf;
import java.util.List;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
@ -31,6 +29,9 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.liveobject.core.RedissonObjectBuilder;
import java.util.Collection;
import java.util.List;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -48,8 +49,6 @@ public interface CommandAsyncExecutor {
CommandAsyncExecutor enableRedissonReferenceSupport(RedissonRxClient redissonReactive); CommandAsyncExecutor enableRedissonReferenceSupport(RedissonRxClient redissonReactive);
boolean isRedissonReferenceSupportEnabled();
<V> RedisException convertException(RFuture<V> future); <V> RedisException convertException(RFuture<V> future);
void syncSubscription(RFuture<?> future); void syncSubscription(RFuture<?> future);
@ -118,6 +117,12 @@ public interface CommandAsyncExecutor {
<V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames); <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames);
ByteBuf encode(Codec codec, Object value);
ByteBuf encodeMapKey(Codec codec, Object value);
ByteBuf encodeMapValue(Codec codec, Object value);
<T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys); <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys);
<T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys); <T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys);

@ -15,6 +15,7 @@
*/ */
package org.redisson.command; package org.redisson.command;
import java.io.IOException;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.redisson.RedissonReference;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.*; import org.redisson.api.*;
import org.redisson.cache.LRUCacheMap; import org.redisson.cache.LRUCacheMap;
@ -104,8 +106,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec); codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec);
} }
@Override private boolean isRedissonReferenceSupportEnabled() {
public boolean isRedissonReferenceSupportEnabled() {
return objectBuilder != null; return objectBuilder != null;
} }
@ -694,7 +695,55 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public RedissonObjectBuilder getObjectBuilder() { public RedissonObjectBuilder getObjectBuilder() {
return objectBuilder; return objectBuilder;
} }
@Override
public ByteBuf encode(Codec codec, Object value) {
if (isRedissonReferenceSupportEnabled()) {
RedissonReference reference = getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public ByteBuf encodeMapKey(Codec codec, Object value) {
if (isRedissonReferenceSupportEnabled()) {
RedissonReference reference = getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getMapKeyEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public ByteBuf encodeMapValue(Codec codec, Object value) {
if (isRedissonReferenceSupportEnabled()) {
RedissonReference reference = getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getMapValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override @Override
public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames) { public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames) {
if (connectionManager.isClusterMode() && queueNames.length > 0) { if (connectionManager.isClusterMode() && queueNames.length > 0) {

@ -15,6 +15,7 @@
*/ */
package org.redisson.liveobject.core; package org.redisson.liveobject.core;
import io.netty.buffer.ByteBuf;
import net.bytebuddy.implementation.bind.annotation.*; import net.bytebuddy.implementation.bind.annotation.*;
import org.redisson.RedissonReference; import org.redisson.RedissonReference;
import org.redisson.RedissonScoredSortedSet; import org.redisson.RedissonScoredSortedSet;
@ -23,6 +24,8 @@ import org.redisson.api.*;
import org.redisson.api.annotation.REntity; import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.REntity.TransformationMode; import org.redisson.api.annotation.REntity.TransformationMode;
import org.redisson.api.annotation.RIndex; import org.redisson.api.annotation.RIndex;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService; import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -32,6 +35,7 @@ import org.redisson.liveobject.resolver.NamingScheme;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -110,12 +114,10 @@ public class AccessorInterceptor {
if (commandExecutor instanceof CommandBatchService) { if (commandExecutor instanceof CommandBatchService) {
liveMap.fastPutAsync(fieldName, new RedissonReference(rEntity, liveMap.fastPutAsync(fieldName, new RedissonReference(rEntity,
ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject), ns.getName(rEntity, liveObject.getLiveObjectId())));
liveObject.getLiveObjectId())));
} else { } else {
liveMap.fastPut(fieldName, new RedissonReference(rEntity, liveMap.fastPut(fieldName, new RedissonReference(rEntity,
ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject), ns.getName(rEntity, liveObject.getLiveObjectId())));
liveObject.getLiveObjectId())));
} }
return me; return me;
@ -152,10 +154,8 @@ public class AccessorInterceptor {
return me; return me;
} }
if (arg == null) { removeIndex(liveMap, me, field);
Object oldArg = liveMap.remove(fieldName); if (arg != null) {
removeIndex(me, oldArg, fieldName, field);
} else {
storeIndex(field, me, arg); storeIndex(field, me, arg);
if (commandExecutor instanceof CommandBatchService) { if (commandExecutor instanceof CommandBatchService) {
@ -169,13 +169,13 @@ public class AccessorInterceptor {
return superMethod.call(); return superMethod.call();
} }
private void removeIndex(Object me, Object oldArg, String fieldName, Field field) { private void removeIndex(RMap<String, Object> liveMap, Object me, Field field) {
if (field.getAnnotation(RIndex.class) == null) { if (field.getAnnotation(RIndex.class) == null) {
return; return;
} }
NamingScheme namingScheme = commandExecutor.getObjectBuilder().getNamingScheme(me.getClass().getSuperclass()); NamingScheme namingScheme = commandExecutor.getObjectBuilder().getNamingScheme(me.getClass().getSuperclass());
String indexName = namingScheme.getIndexName(me.getClass().getSuperclass(), fieldName); String indexName = namingScheme.getIndexName(me.getClass().getSuperclass(), field.getName());
CommandBatchService ce; CommandBatchService ce;
if (commandExecutor instanceof CommandBatchService) { if (commandExecutor instanceof CommandBatchService) {
@ -184,21 +184,40 @@ public class AccessorInterceptor {
ce = new CommandBatchService(connectionManager); ce = new CommandBatchService(connectionManager);
} }
if (oldArg instanceof Number) { if (Number.class.isAssignableFrom(field.getType())) {
RScoredSortedSetAsync<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), ce, indexName, null); RScoredSortedSetAsync<Object> set = new RedissonScoredSortedSet<>(namingScheme.getCodec(), ce, indexName, null);
set.removeAsync(((RLiveObject) me).getLiveObjectId()); set.removeAsync(((RLiveObject) me).getLiveObjectId());
} else { } else {
RMultimapAsync<Object, Object> map = new RedissonSetMultimap<>(namingScheme.getCodec(), ce, indexName); if (ClassUtils.isAnnotationPresent(field.getType(), REntity.class)) {
if (oldArg instanceof RLiveObject) { Object value = liveMap.remove(field.getName());
map.removeAsync(((RLiveObject) oldArg).getLiveObjectId(), ((RLiveObject) me).getLiveObjectId()); RMultimapAsync<Object, Object> map = new RedissonSetMultimap<>(namingScheme.getCodec(), ce, indexName);
map.removeAsync(((RLiveObject) value).getLiveObjectId(), ((RLiveObject) me).getLiveObjectId());
} else { } else {
map.removeAsync(oldArg, ((RLiveObject) me).getLiveObjectId()); removeAsync(ce, indexName, liveMap.getName(), namingScheme.getCodec(), ((RLiveObject) me).getLiveObjectId(), field.getName());
} }
} }
ce.execute(); ce.execute();
} }
private RFuture<Boolean> removeAsync(CommandBatchService ce, String name, String mapName, Codec codec, Object value, String fieldName) {
ByteBuf valueState = ce.encodeMapValue(codec, value);
return ce.evalWriteAsync(name, codec, RedisCommands.EVAL_VOID,
"local oldArg = redis.call('hget', KEYS[2], ARGV[2]);" +
"if oldArg == false then " +
"return; " +
"end;" +
"redis.call('hdel', KEYS[2], ARGV[2]); " +
"local hash = redis.call('hget', KEYS[1], oldArg); " +
"local setName = KEYS[1] .. ':' .. hash; " +
"local res = redis.call('srem', setName, ARGV[1]); " +
"if res == 1 and redis.call('scard', setName) == 0 then " +
"redis.call('hdel', KEYS[1], oldArg); " +
"end; ",
Arrays.asList(name, mapName),
valueState, ce.encodeMapKey(codec, fieldName));
}
private void storeIndex(Field field, Object me, Object arg) { private void storeIndex(Field field, Object me, Object arg) {
if (field.getAnnotation(RIndex.class) == null) { if (field.getAnnotation(RIndex.class) == null) {
return; return;

@ -136,7 +136,7 @@ public class LiveObjectInterceptor {
private String getMapKey(Object id) { private String getMapKey(Object id) {
return namingScheme.getName(originalClass, idFieldType, idFieldName, id); return namingScheme.getName(originalClass, id);
} }
} }

@ -15,51 +15,25 @@
*/ */
package org.redisson.liveobject.core; package org.redisson.liveobject.core;
import java.lang.reflect.Field; import org.redisson.*;
import java.lang.reflect.Method; import org.redisson.api.*;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonBlockingDeque;
import org.redisson.RedissonBlockingQueue;
import org.redisson.RedissonDeque;
import org.redisson.RedissonList;
import org.redisson.RedissonLiveObjectService;
import org.redisson.RedissonMap;
import org.redisson.RedissonQueue;
import org.redisson.RedissonReference;
import org.redisson.RedissonSet;
import org.redisson.RedissonSortedSet;
import org.redisson.api.RLiveObject;
import org.redisson.api.RMap;
import org.redisson.api.RObject;
import org.redisson.api.RObjectReactive;
import org.redisson.api.RObjectRx;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.annotation.REntity; import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RId;
import org.redisson.api.annotation.RObjectField; import org.redisson.api.annotation.RObjectField;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.codec.DefaultReferenceCodecProvider; import org.redisson.codec.DefaultReferenceCodecProvider;
import org.redisson.codec.ReferenceCodecProvider; import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.liveobject.misc.ClassUtils; import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme; import org.redisson.liveobject.resolver.NamingScheme;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
/** /**
* *
* @author Rui Gu * @author Rui Gu
@ -144,7 +118,7 @@ public class RedissonObjectBuilder {
if (mappedClass != null) { if (mappedClass != null) {
Codec fieldCodec = getFieldCodec(clazz, mappedClass, fieldName); Codec fieldCodec = getFieldCodec(clazz, mappedClass, fieldName);
NamingScheme fieldNamingScheme = getNamingScheme(clazz, fieldCodec); NamingScheme fieldNamingScheme = getNamingScheme(clazz, fieldCodec);
String referenceName = fieldNamingScheme.getFieldReferenceName(clazz, id, mappedClass, fieldName, null); String referenceName = fieldNamingScheme.getFieldReferenceName(clazz, id, mappedClass, fieldName);
return createRObject(redisson, mappedClass, referenceName, fieldCodec); return createRObject(redisson, mappedClass, referenceName, fieldCodec);
} }
@ -305,13 +279,9 @@ public class RedissonObjectBuilder {
if (object instanceof RLiveObject) { if (object instanceof RLiveObject) {
Class<? extends Object> rEntity = object.getClass().getSuperclass(); Class<? extends Object> rEntity = object.getClass().getSuperclass();
NamingScheme ns = getNamingScheme(rEntity); NamingScheme ns = getNamingScheme(rEntity);
String name = Introspectior
.getFieldsWithAnnotation(rEntity, RId.class)
.getOnly().getName();
Class<?> type = ClassUtils.getDeclaredField(rEntity, name).getType();
return new RedissonReference(rEntity, return new RedissonReference(rEntity,
ns.getName(rEntity, type, name, ((RLiveObject) object).getLiveObjectId())); ns.getName(rEntity, ((RLiveObject) object).getLiveObjectId()));
} }
} catch (Exception e) { } catch (Exception e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);

@ -36,27 +36,27 @@ public class DefaultNamingScheme extends AbstractNamingScheme implements NamingS
} }
@Override @Override
public String getNamePattern(Class<?> entityClass, Class<?> idFieldClass, String idFieldName) { public String getNamePattern(Class<?> entityClass) {
return "redisson_live_object:{" + "*" + "}:" + entityClass.getName() + ":" + idFieldName + ":" + idFieldClass.getName(); return "redisson_live_object:{" + "*" + "}:" + entityClass.getName();
} }
@Override @Override
public String getName(Class<?> entityClass, Class<?> idFieldClass, String idFieldName, Object idValue) { public String getName(Class<?> entityClass, Object idValue) {
try { try {
String encode = bytesToHex(codec.getMapKeyEncoder().encode(idValue)); String encode = bytesToHex(codec.getMapKeyEncoder().encode(idValue));
return "redisson_live_object:{"+ encode + "}:" + entityClass.getName() + ":" + idFieldName + ":" + idFieldClass.getName(); return "redisson_live_object:{"+ encode + "}:" + entityClass.getName();
} catch (IOException ex) { } catch (IOException ex) {
throw new IllegalArgumentException("Unable to encode \"" + idFieldName + "\" [" + idValue + "] into byte[]", ex); throw new IllegalArgumentException("Unable create name for '" + entityClass + "' with id:" + idValue, ex);
} }
} }
@Override @Override
public String getFieldReferenceName(Class<?> entityClass, Object idValue, Class<?> fieldClass, String fieldName, Object fieldValue) { public String getFieldReferenceName(Class<?> entityClass, Object idValue, Class<?> fieldClass, String fieldName) {
try { try {
String encode = bytesToHex(codec.getMapKeyEncoder().encode(idValue)); String encode = bytesToHex(codec.getMapKeyEncoder().encode(idValue));
return "redisson_live_object_field:{" + encode + "}:" + entityClass.getName() + ":" + fieldName + ":" + fieldClass.getName(); return "redisson_live_object_field:{" + encode + "}:" + entityClass.getName() + ":" + fieldName;
} catch (IOException ex) { } catch (IOException ex) {
throw new IllegalArgumentException("Unable to encode \"" + fieldName + "\" [" + fieldValue + "] into byte[]", ex); throw new IllegalArgumentException("Unable create name for '" + entityClass + "' and field:'" + fieldName + "' with id:" + idValue, ex);
} }
} }

@ -24,13 +24,13 @@ import org.redisson.client.codec.Codec;
*/ */
public interface NamingScheme { public interface NamingScheme {
String getNamePattern(Class<?> entityClass, Class<?> idFieldClass, String idFieldName); String getNamePattern(Class<?> entityClass);
String getName(Class<?> entityClass, Class<?> idFieldClass, String idFieldName, Object idValue); String getName(Class<?> entityClass, Object idValue);
String getIndexName(Class<?> entityClass, String fieldName); String getIndexName(Class<?> entityClass, String fieldName);
String getFieldReferenceName(Class<?> entityClass, Object idValue, Class<?> fieldClass, String fieldName, Object fieldValue); String getFieldReferenceName(Class<?> entityClass, Object idValue, Class<?> fieldClass, String fieldName);
Object resolveId(String name); Object resolveId(String name);

@ -630,6 +630,24 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
assertThat(ids3).isEqualTo(2); assertThat(ids3).isEqualTo(2);
} }
@Test
public void testIndexUpdate() {
RLiveObjectService s = redisson.getLiveObjectService();
TestIndexed t1 = new TestIndexed("1");
t1.setName1("test1");
t1 = s.persist(t1);
Collection<TestIndexed> objects0 = s.find(TestIndexed.class, Conditions.eq("name1", "test1"));
assertThat(objects0.iterator().next().getId()).isEqualTo(t1.getId());
t1.setName1("test2");
Collection<TestIndexed> objects2 = s.find(TestIndexed.class, Conditions.eq("name1", "test1"));
assertThat(objects2.isEmpty()).isTrue();
Collection<TestIndexed> objects3 = s.find(TestIndexed.class, Conditions.eq("name1", "test2"));
assertThat(objects3.iterator().next().getId()).isEqualTo(t1.getId());
}
@Test @Test
public void testFindEq() { public void testFindEq() {
RLiveObjectService s = redisson.getLiveObjectService(); RLiveObjectService s = redisson.getLiveObjectService();
@ -703,16 +721,16 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
assertEquals("1", t.getName()); assertEquals("1", t.getName());
DefaultNamingScheme scheme = new DefaultNamingScheme(redisson.getConfig().getCodec()); DefaultNamingScheme scheme = new DefaultNamingScheme(redisson.getConfig().getCodec());
assertTrue(redisson.getMap(scheme.getName(TestREntity.class, String.class, "name", "1")).isExists()); assertTrue(redisson.getMap(scheme.getName(TestREntity.class, "1")).isExists());
t.setName("3333"); t.setName("3333");
assertEquals("3333", t.getName()); assertEquals("3333", t.getName());
assertTrue(redisson.getMap(scheme.getName(TestREntity.class, String.class, "name", "3333")).isExists()); assertTrue(redisson.getMap(scheme.getName(TestREntity.class, "3333")).isExists());
t.setValue("111"); t.setValue("111");
assertEquals("111", t.getValue()); assertEquals("111", t.getValue());
assertTrue(redisson.getMap(scheme.getName(TestREntity.class, String.class, "name", "3333")).isExists()); assertTrue(redisson.getMap(scheme.getName(TestREntity.class, "3333")).isExists());
assertTrue(!redisson.getMap(scheme.getName(TestREntity.class, String.class, "name", "1")).isExists()); assertTrue(!redisson.getMap(scheme.getName(TestREntity.class, "1")).isExists());
assertEquals("111", redisson.getMap(scheme.getName(TestREntity.class, String.class, "name", "3333")).get("value")); assertEquals("111", redisson.getMap(scheme.getName(TestREntity.class, "3333")).get("value"));
// ((RLiveObject) t).getLiveObjectLiveMap().put("value", "555"); // ((RLiveObject) t).getLiveObjectLiveMap().put("value", "555");
// assertEquals("555", redisson.getMap(REntity.DefaultNamingScheme.INSTANCE.getName(TestREntity.class, "name", "3333")).get("value")); // assertEquals("555", redisson.getMap(REntity.DefaultNamingScheme.INSTANCE.getName(TestREntity.class, "name", "3333")).get("value"));
@ -2150,7 +2168,7 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
} }
} }
@Test(timeout = 30*1000) @Test(timeout = 40*1000)
public void testBatchedPersist() { public void testBatchedPersist() {
RLiveObjectService s = redisson.getLiveObjectService(); RLiveObjectService s = redisson.getLiveObjectService();

Loading…
Cancel
Save