|
|
|
@ -21,11 +21,13 @@ import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.HashMap;
|
|
|
|
|
import java.util.Iterator;
|
|
|
|
|
import java.util.LinkedHashSet;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
@ -41,6 +43,8 @@ import org.redisson.api.RedissonClient;
|
|
|
|
|
import org.redisson.api.RedissonReactiveClient;
|
|
|
|
|
import org.redisson.api.RedissonRxClient;
|
|
|
|
|
import org.redisson.cache.LRUCacheMap;
|
|
|
|
|
import org.redisson.cache.LocalCachedMessageCodec;
|
|
|
|
|
import org.redisson.cache.ReferenceCacheMap;
|
|
|
|
|
import org.redisson.client.RedisAskException;
|
|
|
|
|
import org.redisson.client.RedisClient;
|
|
|
|
|
import org.redisson.client.RedisConnection;
|
|
|
|
@ -52,6 +56,8 @@ import org.redisson.client.RedisResponseTimeoutException;
|
|
|
|
|
import org.redisson.client.RedisTimeoutException;
|
|
|
|
|
import org.redisson.client.RedisTryAgainException;
|
|
|
|
|
import org.redisson.client.WriteRedisConnectionException;
|
|
|
|
|
import org.redisson.client.codec.BitSetCodec;
|
|
|
|
|
import org.redisson.client.codec.ByteArrayCodec;
|
|
|
|
|
import org.redisson.client.codec.Codec;
|
|
|
|
|
import org.redisson.client.codec.StringCodec;
|
|
|
|
|
import org.redisson.client.protocol.CommandData;
|
|
|
|
@ -68,6 +74,7 @@ import org.redisson.connection.ConnectionManager;
|
|
|
|
|
import org.redisson.connection.MasterSlaveEntry;
|
|
|
|
|
import org.redisson.connection.NodeSource;
|
|
|
|
|
import org.redisson.connection.NodeSource.Redirect;
|
|
|
|
|
import org.redisson.jcache.JCacheEventCodec;
|
|
|
|
|
import org.redisson.liveobject.core.RedissonObjectBuilder;
|
|
|
|
|
import org.redisson.misc.LogHelper;
|
|
|
|
|
import org.redisson.misc.RPromise;
|
|
|
|
@ -677,12 +684,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Codec codecToUse = getCodec(codec);
|
|
|
|
|
|
|
|
|
|
final AsyncDetails<V, R> details = AsyncDetails.acquire();
|
|
|
|
|
final RFuture<RedisConnection> connectionFuture = getConnection(readOnlyMode, source, command);
|
|
|
|
|
|
|
|
|
|
final RPromise<R> attemptPromise = new RedissonPromise<R>();
|
|
|
|
|
details.init(connectionFuture, attemptPromise,
|
|
|
|
|
readOnlyMode, source, codec, command, params, mainPromise, attempt);
|
|
|
|
|
readOnlyMode, source, codecToUse, command, params, mainPromise, attempt);
|
|
|
|
|
|
|
|
|
|
FutureListener<R> mainPromiseListener = new FutureListener<R>() {
|
|
|
|
|
@Override
|
|
|
|
@ -816,7 +825,48 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final Map<ClassLoader, Map<Codec, Codec>> codecs = ReferenceCacheMap.weak(0, 0);
|
|
|
|
|
|
|
|
|
|
protected Codec getCodec(Codec codec) {
|
|
|
|
|
if (codec instanceof StringCodec
|
|
|
|
|
|| codec instanceof ByteArrayCodec
|
|
|
|
|
|| codec instanceof LocalCachedMessageCodec
|
|
|
|
|
|| codec instanceof BitSetCodec
|
|
|
|
|
|| codec instanceof JCacheEventCodec
|
|
|
|
|
|| codec == null) {
|
|
|
|
|
return codec;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Codec codecToUse = codec;
|
|
|
|
|
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
|
|
|
|
|
if (threadClassLoader != null) {
|
|
|
|
|
Map<Codec, Codec> map = codecs.get(threadClassLoader);
|
|
|
|
|
if (map == null) {
|
|
|
|
|
synchronized (codecs) {
|
|
|
|
|
map = codecs.get(threadClassLoader);
|
|
|
|
|
if (map == null) {
|
|
|
|
|
map = new ConcurrentHashMap<Codec, Codec>();
|
|
|
|
|
codecs.put(threadClassLoader, map);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
codecToUse = map.get(codec);
|
|
|
|
|
if (codecToUse == null) {
|
|
|
|
|
try {
|
|
|
|
|
codecToUse = codec.getClass().getConstructor(ClassLoader.class, codec.getClass()).newInstance(threadClassLoader, codec);
|
|
|
|
|
} catch (NoSuchMethodException e) {
|
|
|
|
|
codecToUse = codec;
|
|
|
|
|
// skip
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new IllegalStateException(e);
|
|
|
|
|
}
|
|
|
|
|
map.put(codec, codecToUse);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return codecToUse;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected <V> RFuture<RedisConnection> getConnection(final boolean readOnlyMode, final NodeSource source,
|
|
|
|
|
final RedisCommand<V> command) {
|
|
|
|
|
final RFuture<RedisConnection> connectionFuture;
|
|
|
|
|