Fixed - Redis request/response handling performance improvement #1567

pull/1613/head
Nikita 7 years ago
parent 1faaaca92c
commit b7c44b530f

@ -47,7 +47,6 @@ import org.redisson.client.RedisTimeoutException;
import org.redisson.client.RedisTryAgainException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
@ -534,22 +533,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
final AsyncDetails<V, R> details = AsyncDetails.acquire();
if (isRedissonReferenceSupportEnabled()) {
try {
for (int i = 0; i < params.length; i++) {
RedissonReference reference = RedissonObjectFactory.toReference(getConnectionManager().getCfg(), params[i]);
if (reference != null) {
params[i] = reference;
}
}
} catch (Exception e) {
connectionManager.getShutdownLatch().release();
free(params);
mainPromise.tryFailure(e);
return;
}
}
final RFuture<RedisConnection> connectionFuture = getConnection(readOnlyMode, source, command);
final RPromise<R> attemptPromise = new RedissonPromise<R>();
@ -953,7 +936,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
mainPromise.trySuccess(res);
}
}
protected <T> T tryHandleReference(T o) {
boolean hasConversion = false;
if (o instanceof List) {
@ -1008,47 +991,23 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
return o;
} else if (o instanceof Map) {
Map<Object, Object> map, r = (Map<Object, Object>) o;
boolean useNewMap = o instanceof LinkedHashMap;
try {
map = (Map) o.getClass().getConstructor().newInstance();
} catch (Exception e) {
map = new LinkedHashMap();
}
Map<Object, Object> r = (Map<Object, Object>) o;
for (Map.Entry<Object, Object> e : r.entrySet()) {
Map.Entry<Object, Object> ref = tryHandleReference0(e);
//Not testing for ref changes because r.put(ref.getKey(), ref.getValue())
//below needs to fail on the first iteration to be able to
//perform fall back if failure happens.
//
//Assuming the failure reason is systematic such as put method
//is not supported or implemented, and not an occasional issue
//like only one element fails.
if (useNewMap) {
map.put(ref.getKey(), ref.getValue());
} else {
try {
r.put(ref.getKey(), ref.getValue());
if (e.getKey() != ref.getKey()) {
map.put(e.getKey(), e.getValue());
}
} catch (Exception ex) {
//r is not supporting put operation. fall back to use
//a new map.
useNewMap = true;
map.put(ref.getKey(), ref.getValue());
if (e.getKey() instanceof RedissonReference
|| e.getValue() instanceof RedissonReference) {
Object key = e.getKey();
Object value = e.getValue();
if (e.getKey() instanceof RedissonReference) {
key = fromReference(e.getKey());
r.remove(e.getKey());
}
if (e.getValue() instanceof RedissonReference) {
value = fromReference(e.getValue());
}
r.put(key, value);
}
hasConversion |= ref != e;
}
if (!hasConversion) {
return o;
} else if (useNewMap) {
return (T) map;
} else if (!map.isEmpty()) {
r.keySet().removeAll(map.keySet());
}
return o;
} else if (o instanceof ListScanResult) {
tryHandleReference(((ListScanResult) o).getValues());

@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchOptions.ExecutionMode;
@ -55,9 +54,7 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.CountableListener;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
@ -176,14 +173,6 @@ public class CommandBatchService extends CommandAsyncService {
Object[] batchParams = null;
if (!isRedisBasedQueue()) {
batchParams = params;
if (isRedissonReferenceSupportEnabled()) {
for (int i = 0; i < batchParams.length; i++) {
RedissonReference reference = RedissonObjectFactory.toReference(connectionManager.getCfg(), batchParams[i]);
if (reference != null) {
batchParams[i] = reference;
}
}
}
}
BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codec, command, batchParams, index.incrementAndGet());
entry.getCommands().add(commandData);

@ -45,8 +45,6 @@
package org.redisson.liveobject.misc;
import org.redisson.api.RObject;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@ -54,6 +52,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.redisson.api.RObject;
import org.redisson.cache.LRUCacheMap;
/**
*
@ -107,7 +109,7 @@ public class ClassUtils {
}
public static Field getDeclaredField(Class<?> clazz, String fieldName) throws NoSuchFieldException {
for (Class c : getClassHierarchy(clazz)) {
for (Class<?> c : getClassHierarchy(clazz)) {
for (Field field : c.getDeclaredFields()) {
if (field.getName().equals(fieldName)) {
return field;
@ -116,14 +118,26 @@ public class ClassUtils {
}
throw new NoSuchFieldException("No such field: " + fieldName);
}
private static final Map<Class<?>, Boolean> annotatedClasses = new LRUCacheMap<Class<?>, Boolean>(500, 0, 0);
public static boolean isAnnotationPresent(Class<?> clazz, Class<? extends Annotation> annotation) {
for (Class<?> c : getClassHierarchy(clazz)) {
if (c.isAnnotationPresent(annotation)) {
return true;
if (clazz.getName().startsWith("java.")) {
return false;
}
Boolean isAnnotated = annotatedClasses.get(clazz);
if (isAnnotated == null) {
for (Class<?> c : getClassHierarchy(clazz)) {
if (c.isAnnotationPresent(annotation)) {
annotatedClasses.put(clazz, true);
return true;
}
}
annotatedClasses.put(clazz, false);
return false;
}
return false;
return isAnnotated;
}
private static Iterable<Class<?>> getClassHierarchy(Class<?> clazz) {
@ -132,7 +146,7 @@ public class ClassUtils {
return Collections.<Class<?>>singleton(clazz);
}
List<Class<?>> classes = new ArrayList<Class<?>>();
for (Class c = clazz; c != null; c = c.getSuperclass()) {
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
classes.add(c);
}
return classes;

@ -25,7 +25,6 @@ import java.util.Map;
import org.redisson.RedissonLiveObjectService;
import org.redisson.RedissonReference;
import org.redisson.api.RLiveObject;
import org.redisson.api.RLiveObjectService;
import org.redisson.api.RObject;
import org.redisson.api.RObjectReactive;
import org.redisson.api.RedissonClient;

@ -3,11 +3,17 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -22,13 +28,17 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.api.BatchResult;
import org.redisson.api.ClusterNode;
import org.redisson.api.NodeType;
import org.redisson.api.RBatch;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RMap;
import org.redisson.api.RMapAsync;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScoredSortedSet;
@ -38,6 +48,7 @@ import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import org.redisson.config.ReadMode;
@RunWith(Parameterized.class)
public class RedissonBatchTest extends BaseTest {
@ -106,6 +117,27 @@ public class RedissonBatchTest extends BaseTest {
assertThat(b2f2.get()).isEqualTo(2d);
}
@Test(timeout = 18000)
public void testPerformance() {
RMap<String, String> map = redisson.getMap("map");
Map<String, String> m = new HashMap<String, String>();
for (int j = 0; j < 1000; j++) {
m.put("" + j, "" + j);
}
map.putAll(m);
for (int i = 0; i < 10000; i++) {
RBatch rBatch = redisson.createBatch();
RMapAsync<String, String> m1 = rBatch.getMap("map");
m1.getAllAsync(m.keySet());
try {
rBatch.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void testConnectionLeakAfterError() throws InterruptedException {
Config config = createConfig();

Loading…
Cancel
Save