refactoring

pull/1907/head
Nikita Koksharov 6 years ago
parent b7b6175b3b
commit 41f503d3f8

@ -26,7 +26,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@ -65,7 +67,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -76,8 +77,8 @@ public abstract class BaseRemoteService {
private static final Logger log = LoggerFactory.getLogger(BaseRemoteService.class);
private final Map<Class<?>, String> requestQueueNameCache = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Method, List<String>> methodSignaturesCache = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, String> requestQueueNameCache = new ConcurrentHashMap<>();
private final ConcurrentMap<Method, List<String>> methodSignaturesCache = new ConcurrentHashMap<>();
protected final Codec codec;
protected final RedissonClient redisson;
@ -412,7 +413,7 @@ public abstract class BaseRemoteService {
Map<RequestId, List<Result>> entryResponses = entry.getResponses();
List<Result> list = entryResponses.get(requestId);
if (list == null) {
list = new ArrayList<Result>(3);
list = new ArrayList<>(3);
entryResponses.put(requestId, list);
}
@ -650,8 +651,7 @@ public abstract class BaseRemoteService {
protected RequestId generateRequestId() {
byte[] id = new byte[17];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
ThreadLocalRandom.current().nextBytes(id);
id[0] = 00;
return new RequestId(id);
}
@ -679,7 +679,7 @@ public abstract class BaseRemoteService {
protected List<String> getMethodSignatures(Method method) {
List<String> result = methodSignaturesCache.get(method);
if (result == null) {
result = new ArrayList<String>(method.getParameterTypes().length);
result = new ArrayList<>(method.getParameterTypes().length);
for (Class<?> t : method.getParameterTypes()) {
result.add(t.getName());
}

@ -15,10 +15,9 @@
*/
package org.redisson;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -26,7 +25,7 @@ import io.netty.util.internal.PlatformDependent;
*/
public class QueueTransferService {
private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, QueueTransferTask> tasks = new ConcurrentHashMap<>();
public synchronized void schedule(String name, QueueTransferTask task) {
QueueTransferTask oldTask = tasks.putIfAbsent(name, task);

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -89,8 +90,6 @@ import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.remote.ResponseEntry;
import org.redisson.transaction.RedissonTransaction;
import io.netty.util.internal.PlatformDependent;
/**
* Main infrastructure class allows to get access
* to all Redisson objects on top of Redis server.
@ -108,11 +107,11 @@ public class Redisson implements RedissonClient {
protected final EvictionScheduler evictionScheduler;
protected final ConnectionManager connectionManager;
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap();
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = new ConcurrentHashMap<>();
protected final Config config;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
protected final ConcurrentMap<String, ResponseEntry> responses = PlatformDependent.newConcurrentHashMap();
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected Redisson(Config config) {
this.config = config;

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RDelayedQueue;
@ -32,8 +33,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -96,7 +95,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
long delayInMs = timeUnit.toMillis(delay);
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = PlatformDependent.threadLocalRandom().nextLong();
long randomId = ThreadLocalRandom.current().nextLong();
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"

@ -33,12 +33,14 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@ -90,7 +92,6 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -136,7 +137,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final ScheduledTasksService scheduledRemoteService;
private final TasksService executorRemoteService;
private final Map<Class<?>, ClassBody> class2body = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, ClassBody> class2body = new ConcurrentHashMap<>();
private final String name;
private final String requestQueueName;
@ -145,8 +146,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String executorId;
private final ConcurrentMap<String, ResponseEntry> responses;
private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<RExecutorFuture<?>>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(PlatformDependent.<RedissonExecutorFutureReference, Boolean>newConcurrentHashMap());
private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(new ConcurrentHashMap<>());
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson,
String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, ExecutorOptions options) {
@ -211,8 +212,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
protected String generateRequestId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@ -580,7 +580,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException("Tasks are not defined");
}
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
List<RExecutorFuture<?>> result = new ArrayList<>();
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Callable<?> task : tasks) {
@ -608,7 +608,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
List<RExecutorFuture<?>> result = new ArrayList<>();
for (Callable<?> task : tasks) {
check(task);
ClassBody classBody = getClassBody(task);
@ -696,7 +696,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException("Tasks are not defined");
}
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
List<RExecutorFuture<?>> result = new ArrayList<>();
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Runnable task : tasks) {
@ -724,7 +724,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
List<RExecutorFuture<?>> result = new ArrayList<>();
for (Runnable task : tasks) {
check(task);
ClassBody classBody = getClassBody(task);
@ -1013,7 +1013,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new NullPointerException();
}
List<RExecutorFuture<?>> futures = new ArrayList<RExecutorFuture<?>>();
List<RExecutorFuture<?>> futures = new ArrayList<>();
for (Callable<T> callable : tasks) {
RExecutorFuture<T> future = submit(callable);
futures.add(future);

@ -34,6 +34,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.RCascadeType;
@ -73,9 +74,8 @@ import org.redisson.liveobject.misc.AdvBeanCopy;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
import org.redisson.liveobject.resolver.Resolver;
import org.redisson.liveobject.resolver.RIdResolver;
import io.netty.util.internal.PlatformDependent;
import jodd.bean.BeanCopy;
import jodd.bean.BeanUtil;
import net.bytebuddy.ByteBuddy;
@ -90,7 +90,7 @@ import net.bytebuddy.matcher.ElementMatchers;
public class RedissonLiveObjectService implements RLiveObjectService {
private static final ConcurrentMap<Class<? extends Resolver>, Resolver<?, ?, ?>> PROVIDER_CACHE = PlatformDependent.newConcurrentHashMap();
private static final ConcurrentMap<Class<? extends RIdResolver<?>>, RIdResolver<?>> PROVIDER_CACHE = new ConcurrentHashMap<>();
private final ConcurrentMap<Class<?>, Class<?>> classCache;
private final RedissonClient redisson;
private final CommandAsyncExecutor commandExecutor;
@ -120,12 +120,12 @@ public class RedissonLiveObjectService implements RLiveObjectService {
String idFieldName = getRIdFieldName(entityClass);
RId annotation = ClassUtils.getDeclaredField(entityClass, idFieldName)
.getAnnotation(RId.class);
Resolver resolver = getResolver(entityClass, annotation.generator(), annotation);
RIdResolver<?> resolver = getResolver(entityClass, annotation.generator(), annotation);
Object id = resolver.resolve(entityClass, annotation, idFieldName, redisson);
return id;
}
private Resolver<?, ?, ?> getResolver(Class<?> cls, Class<? extends Resolver> resolverClass, Annotation anno) {
private RIdResolver<?> getResolver(Class<?> cls, Class<? extends RIdResolver<?>> resolverClass, Annotation anno) {
if (!PROVIDER_CACHE.containsKey(resolverClass)) {
try {
PROVIDER_CACHE.putIfAbsent(resolverClass, resolverClass.newInstance());

@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.redisson.api.LocalCachedMapOptions;
@ -67,13 +68,6 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
@SuppressWarnings("serial")
public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements RLocalCachedMap<K, V> {
@ -265,8 +259,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
protected static byte[] generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
ThreadLocalRandom.current().nextBytes(id);
return id;
}
@ -274,8 +267,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] result = new byte[keyHash.length + 1 + 8];
result[16] = ':';
byte[] id = new byte[8];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
ThreadLocalRandom.current().nextBytes(id);
System.arraycopy(keyHash, 0, result, 0, keyHash.length);
System.arraycopy(id, 0, result, 17, id.length);

@ -18,6 +18,7 @@ package org.redisson;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
@ -80,7 +80,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
private static final Logger log = LoggerFactory.getLogger(RedissonLock.class);
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = PlatformDependent.newConcurrentHashMap();
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
protected long internalLockLeaseTime;
final UUID id;

@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -33,8 +34,6 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import io.netty.util.internal.ThreadLocalRandom;
/**
* Groups multiple independent locks and manages them as one lock.
*

@ -19,6 +19,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map.Entry;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RExecutorService;
@ -32,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -75,8 +75,7 @@ public final class RedissonNode {
private String generateId() {
byte[] id = new byte[8];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}

@ -17,6 +17,7 @@ package org.redisson;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -33,7 +34,6 @@ import org.redisson.pubsub.SemaphorePubSub;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -365,8 +365,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
protected String generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}

@ -17,6 +17,7 @@ package org.redisson;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
@ -31,8 +32,6 @@ import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.util.internal.PlatformDependent;
/**
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.PriorityBlockingQueue}.
*
@ -96,7 +95,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
}
}
long del = PlatformDependent.threadLocalRandom().nextInt(2000000);
long del = ThreadLocalRandom.current().nextInt(2000000);
if (timeoutInMicro > 0 && remain < 2000000) {
del = 0;
}

@ -18,6 +18,7 @@ package org.redisson;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -53,8 +54,6 @@ import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -87,8 +86,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, Entry> remoteMap = PlatformDependent.newConcurrentHashMap();
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = new ConcurrentHashMap<>();
private final Map<Class<?>, Entry> remoteMap = new ConcurrentHashMap<>();
public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);

@ -41,6 +41,6 @@ public @interface RId {
* @see UUIDGenerator
* @see LongGenerator
*/
Class<? extends RIdResolver> generator() default RequiredIdResolver.class;
Class<? extends RIdResolver<?>> generator() default RequiredIdResolver.class;
}

@ -23,11 +23,10 @@ import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -38,7 +37,7 @@ import io.netty.util.internal.PlatformDependent;
public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
final int size;
final ConcurrentMap<K, CachedValue<K, V>> map = PlatformDependent.newConcurrentHashMap();
final ConcurrentMap<K, CachedValue<K, V>> map = new ConcurrentHashMap<>();
private final long timeToLiveInMillis;
private final long maxIdleInMillis;

@ -20,6 +20,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.codec.Codec;
@ -40,7 +41,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -50,8 +50,8 @@ import io.netty.util.internal.PlatformDependent;
public class RedisPubSubConnection extends RedisConnection {
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
final Map<ChannelName, Codec> channels = PlatformDependent.newConcurrentHashMap();
final Map<ChannelName, Codec> patternChannels = PlatformDependent.newConcurrentHashMap();
final Map<ChannelName, Codec> channels = new ConcurrentHashMap<>();
final Map<ChannelName, Codec> patternChannels = new ConcurrentHashMap<>();
final Set<ChannelName> unsubscibedChannels = new HashSet<ChannelName>();
final Set<ChannelName> punsubscibedChannels = new HashSet<ChannelName>();

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.redisson.client.ChannelName;
@ -41,7 +42,6 @@ import org.redisson.misc.LogHelper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.internal.PlatformDependent;
/**
* Redis Publish Subscribe protocol decoder
@ -53,8 +53,8 @@ public class CommandPubSubDecoder extends CommandDecoder {
private static final Set<String> MESSAGES = new HashSet<String>(Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe"));
// It is not needed to use concurrent map because responses are coming consecutive
private final Map<ChannelName, PubSubEntry> entries = new HashMap<ChannelName, PubSubEntry>();
private final Map<PubSubKey, CommandData<Object, Object>> commands = PlatformDependent.newConcurrentHashMap();
private final Map<ChannelName, PubSubEntry> entries = new HashMap<>();
private final Map<PubSubKey, CommandData<Object, Object>> commands = new ConcurrentHashMap<>();
private final boolean keepOrder;

@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -66,7 +67,6 @@ import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -77,7 +77,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = new ConcurrentHashMap<>();
private ScheduledFuture<?> monitorFuture;

@ -15,13 +15,14 @@
*/
package org.redisson.codec;
import io.netty.util.internal.PlatformDependent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.redisson.api.RObject;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RObjectField;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.redisson.liveobject.misc.ClassUtils;
/**
@ -30,7 +31,7 @@ import org.redisson.liveobject.misc.ClassUtils;
*/
public class DefaultReferenceCodecProvider implements ReferenceCodecProvider {
private final ConcurrentMap<Class<? extends Codec>, Codec> codecCache = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Class<? extends Codec>, Codec> codecCache = new ConcurrentHashMap<>();
@Override
public <T extends Codec> T getCodec(Class<T> codecClass) {

@ -63,7 +63,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -123,12 +122,12 @@ public class CommandBatchService extends CommandAsyncService {
private AtomicInteger index = new AtomicInteger();
private ConcurrentMap<MasterSlaveEntry, Entry> commands = PlatformDependent.newConcurrentHashMap();
private ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections = PlatformDependent.newConcurrentHashMap();
private ConcurrentMap<MasterSlaveEntry, Entry> commands = new ConcurrentHashMap<>();
private ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections = new ConcurrentHashMap<>();
private BatchOptions options;
private Map<RFuture<?>, List<CommandBatchService>> nestedServices = PlatformDependent.newConcurrentHashMap();
private Map<RFuture<?>, List<CommandBatchService>> nestedServices = new ConcurrentHashMap<>();
private AtomicBoolean executed = new AtomicBoolean();

@ -17,17 +17,15 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.netty.util.internal.PlatformDependent;
public class ConnectionEventsHub {
public enum Status {CONNECTED, DISCONNECTED};
private final ConcurrentMap<InetSocketAddress, Status> maps = PlatformDependent.newConcurrentHashMap();
private final Map<Integer, ConnectionListener> listenersMap = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<InetSocketAddress, Status> maps = new ConcurrentHashMap<>();
private final Map<Integer, ConnectionListener> listenersMap = new ConcurrentHashMap<>();
public int addListener(ConnectionListener listener) {
int id = System.identityHashCode(listener);

@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -128,8 +129,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveServersConfig config;
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<MasterSlaveEntry>(MAX_SLOT);
private final Map<RedisClient, MasterSlaveEntry> client2entry = PlatformDependent.newConcurrentHashMap();
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<>(MAX_SLOT);
private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
private final RPromise<Boolean> shutdownPromise;
@ -149,7 +150,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private PublishSubscribeService subscribeService;
private final Map<Object, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
private final Map<Object, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
this(config, id);

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -56,7 +57,6 @@ import io.netty.resolver.AddressResolver;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -67,10 +67,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final ConcurrentMap<String, RedisClient> sentinels = new ConcurrentHashMap<>();
private final AtomicReference<String> currentMaster = new AtomicReference<>();
private final Set<URI> disconnectedSlaves = new HashSet<URI>();
private final Set<URI> disconnectedSlaves = new HashSet<>();
private ScheduledFuture<?> monitorFuture;
private AddressResolver<InetSocketAddress> sentinelResolver;
@ -127,7 +127,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
List<Map<String, String>> sentinelSentinels = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
List<RFuture<Void>> connectionFutures = new ArrayList<RFuture<Void>>(sentinelSentinels.size());
List<RFuture<Void>> connectionFutures = new ArrayList<>(sentinelSentinels.size());
for (Map<String, String> map : sentinelSentinels) {
if (map.isEmpty()) {
continue;
@ -186,7 +186,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
monitorFuture = group.schedule(new Runnable() {
@Override
public void run() {
List<RedisClient> sentinels = new ArrayList<RedisClient>(SentinelConnectionManager.this.sentinels.values());
List<RedisClient> sentinels = new ArrayList<>(SentinelConnectionManager.this.sentinels.values());
final AtomicInteger sentinelsCounter = new AtomicInteger(sentinels.size());
FutureListener<List<InetSocketAddress>> commonListener = new FutureListener<List<InetSocketAddress>>() {
@ -336,7 +336,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
Set<String> currentSlaves = new HashSet<String>(slavesMap.size());
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
List<RFuture<Void>> futures = new ArrayList<>();
for (Map<String, String> map : slavesMap) {
if (map.isEmpty()) {
continue;
@ -543,7 +543,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
monitorFuture.cancel(true);
}
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
List<RFuture<Void>> futures = new ArrayList<>();
for (RedisClient sentinel : sentinels.values()) {
RFuture<Void> future = sentinel.shutdownAsync();
futures.add(future);

@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
@ -43,8 +44,6 @@ import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -58,7 +57,7 @@ public class LoadBalancerManager {
private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool;
private final Map<RedisClient, ClientConnectionsEntry> client2Entry = PlatformDependent.newConcurrentHashMap();
private final Map<RedisClient, ClientConnectionsEntry> client2Entry = new ConcurrentHashMap<>();
public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager;

@ -16,11 +16,10 @@
package org.redisson.connection.balancer;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.redisson.connection.ClientConnectionsEntry;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -30,7 +29,7 @@ public class RandomLoadBalancer implements LoadBalancer {
@Override
public ClientConnectionsEntry getEntry(List<ClientConnectionsEntry> clientsCopy) {
int ind = PlatformDependent.threadLocalRandom().nextInt(clientsCopy.size());
int ind = ThreadLocalRandom.current().nextInt(clientsCopy.size());
return clientsCopy.get(ind);
}

@ -25,11 +25,10 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.connection.ClientConnectionsEntry;
import io.netty.util.internal.PlatformDependent;
import org.redisson.misc.URIBuilder;
/**
@ -66,7 +65,7 @@ public class WeightedRoundRobinBalancer implements LoadBalancer {
private final AtomicInteger index = new AtomicInteger(-1);
private final Map<InetSocketAddress, WeightEntry> weights = PlatformDependent.newConcurrentHashMap();
private final Map<InetSocketAddress, WeightEntry> weights = new ConcurrentHashMap<>();
private final int defaultWeight;
@ -93,7 +92,7 @@ public class WeightedRoundRobinBalancer implements LoadBalancer {
}
private Set<InetSocketAddress> getAddresses(List<ClientConnectionsEntry> clients) {
Set<InetSocketAddress> result = new HashSet<InetSocketAddress>();
Set<InetSocketAddress> result = new HashSet<>();
for (ClientConnectionsEntry entry : clients) {
if (entry.isFreezed()) {
continue;
@ -108,14 +107,14 @@ public class WeightedRoundRobinBalancer implements LoadBalancer {
Set<InetSocketAddress> addresses = getAddresses(clients);
if (!addresses.equals(weights.keySet())) {
Set<InetSocketAddress> newAddresses = new HashSet<InetSocketAddress>(addresses);
Set<InetSocketAddress> newAddresses = new HashSet<>(addresses);
newAddresses.removeAll(weights.keySet());
for (InetSocketAddress addr : newAddresses) {
weights.put(addr, new WeightEntry(defaultWeight));
}
}
Map<InetSocketAddress, WeightEntry> weightsCopy = new HashMap<InetSocketAddress, WeightEntry>(weights);
Map<InetSocketAddress, WeightEntry> weightsCopy = new HashMap<>(weights);
synchronized (this) {
@ -158,7 +157,7 @@ public class WeightedRoundRobinBalancer implements LoadBalancer {
}
private List<ClientConnectionsEntry> findClients(List<ClientConnectionsEntry> clients, Map<InetSocketAddress, WeightEntry> weightsCopy) {
List<ClientConnectionsEntry> clientsCopy = new ArrayList<ClientConnectionsEntry>();
List<ClientConnectionsEntry> clientsCopy = new ArrayList<>();
for (InetSocketAddress addr : weightsCopy.keySet()) {
for (ClientConnectionsEntry clientConnectionsEntry : clients) {
if (clientConnectionsEntry.getClient().getAddr().equals(addr)

@ -15,12 +15,11 @@
*/
package org.redisson.eviction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.command.CommandAsyncExecutor;
import io.netty.util.internal.PlatformDependent;
/**
* Eviction scheduler.
* Deletes expired entries in time interval between 5 seconds to 2 hours.
@ -32,7 +31,7 @@ import io.netty.util.internal.PlatformDependent;
*/
public class EvictionScheduler {
private final ConcurrentMap<String, EvictionTask> tasks = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, EvictionTask> tasks = new ConcurrentHashMap<>();
private final CommandAsyncExecutor executor;
public EvictionScheduler(CommandAsyncExecutor executor) {

@ -17,6 +17,7 @@ package org.redisson.executor;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
@ -33,8 +34,6 @@ import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -136,8 +135,7 @@ public class ScheduledTasksService extends TasksService {
protected RequestId generateRequestId() {
if (requestId == null) {
byte[] id = new byte[17];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
ThreadLocalRandom.current().nextBytes(id);
id[0] = 1;
return new RequestId(id);
}

@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.cache.Cache;
import javax.cache.CacheException;
@ -73,7 +74,6 @@ import org.redisson.jcache.configuration.JCacheConfiguration;
import org.redisson.misc.Hash;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
/**
* JCache implementation
@ -241,7 +241,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
List<Object> result = new ArrayList<Object>(3);
result.add(value);
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
@ -406,7 +406,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
private boolean putValueLocked(K key, Object value) {
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
@ -480,7 +480,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private boolean putValue(K key, Object value) {
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
Long creationTimeout = getCreationTimeout();
Long updateTimeout = getUpdateTimeout();
@ -949,7 +949,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
private List<Object> getAndPutValueLocked(K key, V value) {
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
List<Object> result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
@ -1021,7 +1021,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
Long updateTimeout = getUpdateTimeout();
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
List<Object> result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"local value = redis.call('hget', KEYS[1], ARGV[4]);"
@ -1351,7 +1351,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
private boolean removeValue(K key) {
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
List<Object> res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"local value = redis.call('hexists', KEYS[1], ARGV[2]); "
@ -1588,7 +1588,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
private V getAndRemoveValue(K key) {
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
List<Object> result = evalWrite(getName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
@ -1704,7 +1704,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
if (res == 1) {
Long updateTimeout = getUpdateTimeout();
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
@ -1745,7 +1745,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
return -1;
}
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
List<Object> result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
@ -1904,7 +1904,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private boolean replaceValueLocked(K key, V value) {
if (containsKey(key)) {
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
Long updateTimeout = getUpdateTimeout();
Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
@ -2045,7 +2045,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
if (oldValue != null) {
Long updateTimeout = getUpdateTimeout();
double syncId = PlatformDependent.threadLocalRandom().nextDouble();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "

@ -16,7 +16,6 @@
package org.redisson.jcache;
import java.io.IOException;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
@ -63,7 +62,7 @@ public class JCacheEventCodec implements Codec {
result.add(value);
if (sync) {
double syncId = buf.order(ByteOrder.LITTLE_ENDIAN).readDouble();
double syncId = buf.readDoubleLE();
result.add(syncId);
}

@ -22,13 +22,12 @@ import org.redisson.api.annotation.RId;
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class LongGenerator implements RIdResolver<RId, Long> {
public class LongGenerator implements RIdResolver<Long> {
public static final LongGenerator INSTANCE
= new LongGenerator();
public static final LongGenerator INSTANCE = new LongGenerator();
@Override
public Long resolve(Class value, RId id, String idFieldName, RedissonClient redisson) {
public Long resolve(Class<?> value, RId id, String idFieldName, RedissonClient redisson) {
return redisson.getAtomicLong(this.getClass().getCanonicalName()
+ "{" + value.getCanonicalName() + "}:" + idFieldName)
.incrementAndGet();

@ -21,10 +21,9 @@ import org.redisson.api.annotation.RId;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
* @param <A> RId annotation to resolve
* @param <V> Value type
*/
public interface RIdResolver<A extends RId, V> extends Resolver<Class<?>, A, V>{
public interface RIdResolver<V> {
/**
* RLiveObjectService instantiate the class and invokes this method to get
@ -32,10 +31,10 @@ public interface RIdResolver<A extends RId, V> extends Resolver<Class<?>, A, V>{
*
* @param cls the class of the LiveObject.
* @param annotation the RId annotation used in the class.
* @param idFieldName field id
* @param redisson instance
* @return resolved RId field value.
*/
@Override
V resolve(Class<?> cls, A annotation, String idFieldName, RedissonClient redisson);
V resolve(Class<?> cls, RId annotation, String idFieldName, RedissonClient redisson);
}

@ -23,12 +23,12 @@ import org.redisson.api.annotation.RId;
* @author Nikita Koksharov
*
*/
public class RequiredIdResolver implements RIdResolver<RId, Object> {
public class RequiredIdResolver implements RIdResolver<Object> {
public static final RequiredIdResolver INSTANCE = new RequiredIdResolver();
@Override
public Object resolve(Class cls, RId annotation, String idFieldName, RedissonClient redisson) {
public Object resolve(Class<?> cls, RId annotation, String idFieldName, RedissonClient redisson) {
throw new IllegalArgumentException("id value not defined for instance of " + cls);
}

@ -1,45 +0,0 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.liveobject.resolver;
import java.lang.annotation.Annotation;
import org.redisson.api.RedissonClient;
/**
* A resolver is used to provide value based on contextual parameters
*
* @author Rui Gu (https://github.com/jackygurui)
* @param <T> Field instance type
* @param <A> Annotation to resolve
* @param <V> Value type
*/
public interface Resolver<T, A extends Annotation, V> {
/**
* Used to provide a value instance based on contextual parameters.
*
* Actual behavior may vary depending on implementation
*
* @param value object
* @param annotation object
* @param idFieldName name of field
* @param redisson instance
* @return resolved value
*/
V resolve(T value, A annotation, String idFieldName, RedissonClient redisson);
}

@ -24,7 +24,7 @@ import org.redisson.api.annotation.RId;
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class UUIDGenerator implements RIdResolver<RId, String>{
public class UUIDGenerator implements RIdResolver<String>{
public static final UUIDGenerator INSTANCE = new UUIDGenerator();

@ -15,6 +15,7 @@
*/
package org.redisson.pubsub;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@ -29,8 +30,6 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
@ -38,10 +37,10 @@ import io.netty.util.internal.PlatformDependent;
*/
abstract class PublishSubscribe<E extends PubSubEntry<E>> {
private final ConcurrentMap<String, E> entries = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();
public void unsubscribe(final E entry, final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
final AsyncSemaphore semaphore = subscribeService.getSemaphore(new ChannelName(channelName));
public void unsubscribe(E entry, String entryName, String channelName, PublishSubscribeService subscribeService) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(new ChannelName(channelName));
semaphore.acquire(new Runnable() {
@Override
public void run() {
@ -64,10 +63,10 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
return entries.get(entryName);
}
public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
final AsyncSemaphore semaphore = subscribeService.getSemaphore(new ChannelName(channelName));
final RPromise<E> newPromise = new RedissonPromise<E>() {
public RFuture<E> subscribe(String entryName, String channelName, PublishSubscribeService subscribeService) {
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(new ChannelName(channelName));
RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
@ -111,7 +110,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
protected abstract void onMessage(E value, Long message);
private RedisPubSubListener<Object> createListener(final String channelName, final E value) {
private RedisPubSubListener<Object> createListener(String channelName, E value) {
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override

@ -17,6 +17,7 @@ package org.redisson.pubsub;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -60,7 +60,7 @@ public class PublishSubscribeService {
private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
protected final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
protected final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();

@ -641,7 +641,7 @@ public class BaseTransactionalMap<K, V> {
protected RFuture<Map<K, V>> getAllOperationAsync(Set<K> keys) {
RPromise<Map<K, V>> result = new RedissonPromise<>();
Set<K> keysToLoad = new HashSet<K>(keys);
Set<K> keysToLoad = new HashSet<K>();
Map<K, V> map = new HashMap<K, V>();
for (K key : keys) {
HashValue keyHash = toKeyHash(key);
@ -656,6 +656,10 @@ public class BaseTransactionalMap<K, V> {
}
}
if (keysToLoad.isEmpty()) {
return RedissonPromise.newSucceededFuture(map);
}
RFuture<Map<K, V>> future = ((RedissonMap<K, V>) this.map).getAllOperationAsync(keysToLoad);
future.onComplete((res, e) -> {
if (e != null) {

Loading…
Cancel
Save