Fixed - RLiveObjectService.findIds() method implemented. #2779

pull/2791/head
Nikita Koksharov 5 years ago
parent 7814100618
commit 27850cf17d

@ -36,6 +36,7 @@ import org.redisson.api.RType;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
@ -89,17 +90,21 @@ public class RedissonKeys implements RKeys {
@Override
public Iterable<String> getKeysByPattern(String pattern, int count) {
List<Iterable<String>> iterables = new ArrayList<Iterable<String>>();
return getKeysByPattern(RedisCommands.SCAN, pattern, count);
}
public <T> Iterable<T> getKeysByPattern(RedisCommand<?> command, String pattern, int count) {
List<Iterable<T>> iterables = new ArrayList<>();
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
Iterable<String> iterable = new Iterable<String>() {
Iterable<T> iterable = new Iterable<T>() {
@Override
public Iterator<String> iterator() {
return createKeysIterator(entry, pattern, count);
public Iterator<T> iterator() {
return createKeysIterator(entry, command, pattern, count);
}
};
iterables.add(iterable);
}
return new CompositeIterable<String>(iterables);
return new CompositeIterable<T>(iterables);
}
@Override
@ -112,23 +117,28 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null, count);
}
public RFuture<ListScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos,
String pattern, int count) {
public RFuture<ListScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand<?> command, long startPos,
String pattern, int count) {
if (pattern == null) {
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT",
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, command, startPos, "COUNT",
count);
}
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH",
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, command, startPos, "MATCH",
pattern, "COUNT", count);
}
private Iterator<String> createKeysIterator(MasterSlaveEntry entry, String pattern, int count) {
return new RedissonBaseIterator<String>() {
public RFuture<ListScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos,
String pattern, int count) {
return scanIteratorAsync(client, entry, RedisCommands.SCAN, startPos, "COUNT", count);
}
private <T> Iterator<T> createKeysIterator(MasterSlaveEntry entry, RedisCommand<?> command, String pattern, int count) {
return new RedissonBaseIterator<T>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return commandExecutor
.get(RedissonKeys.this.scanIteratorAsync(client, entry, nextIterPos, pattern, count));
.get(RedissonKeys.this.scanIteratorAsync(client, entry, command, nextIterPos, pattern, count));
}
@Override
@ -234,7 +244,7 @@ public class RedissonKeys implements RKeys {
public void run() {
long count = 0;
try {
Iterator<String> keysIterator = createKeysIterator(entry, pattern, batchSize);
Iterator<String> keysIterator = createKeysIterator(entry, RedisCommands.SCAN, pattern, batchSize);
List<String> keys = new ArrayList<String>();
while (keysIterator.hasNext()) {
String key = keysIterator.next();

@ -29,6 +29,12 @@ import net.bytebuddy.matcher.ElementMatchers;
import org.redisson.api.*;
import org.redisson.api.annotation.*;
import org.redisson.api.condition.Condition;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
@ -38,6 +44,7 @@ import org.redisson.liveobject.core.*;
import org.redisson.liveobject.misc.AdvBeanCopy;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
import org.redisson.liveobject.resolver.RIdResolver;
import java.lang.reflect.Constructor;
@ -163,6 +170,8 @@ public class RedissonLiveObjectService implements RLiveObjectService {
@Override
public <T> List<T> persist(T... detachedObjects) {
CommandBatchService commandExecutor = new CommandBatchService(connectionManager);
commandExecutor.setObjectBuilder(connectionManager.getCommandExecutor().getObjectBuilder());
Map<Class<?>, Class<?>> classCache = new HashMap<>();
Map<T, Object> detached2Attached = new LinkedHashMap<>();
Map<String, Object> name2id = new HashMap<>();
@ -261,7 +270,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
}
if (rObject instanceof Collection) {
Collection coll = ((Collection) rObject);
Collection coll = (Collection) rObject;
if (type == RCascadeType.MERGE) {
coll.clear();
}
@ -550,6 +559,32 @@ public class RedissonLiveObjectService implements RLiveObjectService {
return asLiveObject(entity).delete();
}
@Override
public <K> Iterable<K> findIds(Class<?> entityClass) {
try {
String idFieldName = getRIdFieldName(entityClass);
Class<?> idFieldType = ClassUtils.getDeclaredField(entityClass, idFieldName).getType();
NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(entityClass);
String pattern = namingScheme.getNamePattern(entityClass, idFieldType, idFieldName);
RedissonKeys keys = new RedissonKeys(connectionManager.getCommandExecutor());
RedisCommand<ListScanResult<String>> command = new RedisCommand<>("SCAN",
new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<Object>()), new Convertor<Object>() {
@Override
public Object convert(Object obj) {
if (!(obj instanceof String)) {
return obj;
}
return namingScheme.resolveId(obj.toString());
}
});
return keys.getKeysByPattern(command, pattern, 10);
} catch (NoSuchFieldException e) {
throw new IllegalStateException(e);
}
}
@Override
public <T> RLiveObject asLiveObject(T instance) {
return (RLiveObject) instance;

@ -70,6 +70,15 @@ public interface RLiveObjectService {
*/
<T> Collection<T> find(Class<T> entityClass, Condition condition);
/**
* Returns all entry ids by specified <code>entityClass</code>.
*
* @param entityClass - entity class
* @param <K> Key type
* @return collection of ids or empty collection.
*/
<K> Iterable<K> findIds(Class<?> entityClass);
/**
* Returns proxied object for the detached object. Discard all the
* field values already in the detached instance.

@ -68,7 +68,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
final ConnectionManager connectionManager;
RedissonObjectBuilder objectBuilder;
protected RedissonObjectBuilder objectBuilder;
public CommandAsyncService(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;

@ -28,6 +28,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -116,6 +117,10 @@ public class CommandBatchService extends CommandAsyncService {
super(connectionManager);
this.options = options;
}
public void setObjectBuilder(RedissonObjectBuilder objectBuilder) {
this.objectBuilder = objectBuilder;
}
public BatchOptions getOptions() {
return options;

@ -149,14 +149,18 @@ public class AccessorInterceptor {
}
if (arg instanceof RObject) {
connectionManager.getCommandExecutor().getObjectBuilder().store((RObject) arg, fieldName, liveMap);
if (commandExecutor instanceof CommandBatchService) {
commandExecutor.getObjectBuilder().storeAsync((RObject) arg, fieldName, liveMap);
} else {
commandExecutor.getObjectBuilder().store((RObject) arg, fieldName, liveMap);
}
return me;
}
if (arg == null) {
Object oldArg = liveMap.remove(fieldName);
if (field.getAnnotation(RIndex.class) != null) {
NamingScheme namingScheme = connectionManager.getCommandExecutor().getObjectBuilder().getNamingScheme(me.getClass().getSuperclass());
NamingScheme namingScheme = commandExecutor.getObjectBuilder().getNamingScheme(me.getClass().getSuperclass());
String indexName = namingScheme.getIndexName(me.getClass().getSuperclass(), fieldName);
CommandBatchService ce;

@ -119,6 +119,15 @@ public class RedissonObjectBuilder {
public ReferenceCodecProvider getReferenceCodecProvider() {
return codecProvider;
}
public void storeAsync(RObject ar, String fieldName, RMap<String, Object> liveMap) {
Codec codec = ar.getCodec();
if (codec != null) {
codecProvider.registerCodec((Class) codec.getClass(), codec);
}
liveMap.fastPutAsync(fieldName,
new RedissonReference(ar.getClass(), ar.getName(), codec));
}
public void store(RObject ar, String fieldName, RMap<String, Object> liveMap) {
Codec codec = ar.getCodec();

@ -35,6 +35,11 @@ public class DefaultNamingScheme extends AbstractNamingScheme implements NamingS
super(codec);
}
@Override
public String getNamePattern(Class<?> entityClass, Class<?> idFieldClass, String idFieldName) {
return "redisson_live_object:{" + "*" + "}:" + entityClass.getName() + ":" + idFieldName + ":" + idFieldClass.getName();
}
@Override
public String getName(Class<?> entityClass, Class<?> idFieldClass, String idFieldName, Object idValue) {
try {
@ -55,17 +60,6 @@ public class DefaultNamingScheme extends AbstractNamingScheme implements NamingS
}
}
@Override
public String resolveClassName(String name) {
return name.substring(name.lastIndexOf("}:") + 2, name.indexOf(":"));
}
@Override
public String resolveIdFieldName(String name) {
String s = name.substring(0, name.lastIndexOf(":"));
return s.substring(s.lastIndexOf(":") + 1);
}
@Override
public Object resolveId(String name) {
String decode = name.substring(name.indexOf("{") + 1, name.indexOf("}"));

@ -24,16 +24,14 @@ import org.redisson.client.codec.Codec;
*/
public interface NamingScheme {
String getNamePattern(Class<?> entityClass, Class<?> idFieldClass, String idFieldName);
String getName(Class<?> entityClass, Class<?> idFieldClass, String idFieldName, Object idValue);
String getIndexName(Class<?> entityClass, String fieldName);
String getFieldReferenceName(Class<?> entityClass, Object idValue, Class<?> fieldClass, String fieldName, Object fieldValue);
String resolveClassName(String name);
String resolveIdFieldName(String name);
Object resolveId(String name);
Codec getCodec();

@ -9,18 +9,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeSet;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -1255,6 +1244,21 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
}
}
@Test
public void testFindIds() {
RLiveObjectService s = redisson.getLiveObjectService();
TestIndexed1 t1 = new TestIndexed1();
t1.setId("1");
t1.setKeywords(Collections.singletonList("132323"));
TestIndexed1 t2 = new TestIndexed1();
t2.setId("2");
t2.setKeywords(Collections.singletonList("fjdklj"));
s.persist(t1, t2);
Iterable<String> ids = s.findIds(TestIndexed1.class);
assertThat(ids).containsExactlyInAnyOrder("1", "2");
}
@Test
public void testMergeList2() {
RLiveObjectService s = redisson.getLiveObjectService();

Loading…
Cancel
Save