From ec5873bb1bc8af31f8e4483bb76655758ff5fd8e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 18 Apr 2019 10:12:22 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonExecutorService.java | 44 +++++++++---------- .../org/redisson/RedissonLocalCachedMap.java | 40 ++--------------- .../redisson/cache/LocalCacheListener.java | 41 ++++++++++++++--- .../cluster/ClusterConnectionManager.java | 2 +- .../connection/SentinelConnectionManager.java | 4 ++ 5 files changed, 63 insertions(+), 68 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index a4f984840..dc68f332b 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -321,10 +321,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public void execute(Runnable task) { check(task); - ClassBody classBody = getClassBody(task); - byte[] state = encode(task); RemotePromise promise = (RemotePromise) asyncServiceWithoutResult.executeRunnable( - new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); + createTaskParameters(task)); syncExecute(promise); } @@ -338,9 +336,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); for (Runnable task : tasks) { check(task); - ClassBody classBody = getClassBody(task); - byte[] state = encode(task); - asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); + asyncServiceWithoutResult.executeRunnable(createTaskParameters(task)); } List result = (List) executorRemoteService.executeAdd(); @@ -567,9 +563,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RExecutorFuture submitAsync(Callable task) { check(task); - ClassBody classBody = getClassBody(task); - byte[] state = encode(task); - RemotePromise result = (RemotePromise) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); + RemotePromise result = (RemotePromise) asyncService.executeCallable(createTaskParameters(task)); addListener(result); return createFuture(result); } @@ -585,9 +579,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); for (Callable task : tasks) { check(task); - ClassBody classBody = getClassBody(task); - byte[] state = encode(task); - RemotePromise promise = (RemotePromise) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); + RemotePromise promise = (RemotePromise) asyncService.executeCallable(createTaskParameters(task)); RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -599,7 +591,19 @@ public class RedissonExecutorService implements RScheduledExecutorService { return new RedissonExecutorBatchFuture(result); } + + protected TaskParameters createTaskParameters(Callable task) { + ClassBody classBody = getClassBody(task); + byte[] state = encode(task); + return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state); + } + protected TaskParameters createTaskParameters(Runnable task) { + ClassBody classBody = getClassBody(task); + byte[] state = encode(task); + return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state); + } + @Override public RExecutorBatchFuture submitAsync(Callable...tasks) { if (tasks.length == 0) { @@ -611,9 +615,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { List> result = new ArrayList<>(); for (Callable task : tasks) { check(task); - ClassBody classBody = getClassBody(task); - byte[] state = encode(task); - RemotePromise promise = (RemotePromise) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); + RemotePromise promise = (RemotePromise) asyncService.executeCallable(createTaskParameters(task)); RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -701,9 +703,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); for (Runnable task : tasks) { check(task); - ClassBody classBody = getClassBody(task); - byte[] state = encode(task); - RemotePromise promise = (RemotePromise) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); + RemotePromise promise = (RemotePromise) asyncService.executeRunnable(createTaskParameters(task)); RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -727,9 +727,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { List> result = new ArrayList<>(); for (Runnable task : tasks) { check(task); - ClassBody classBody = getClassBody(task); - byte[] state = encode(task); - RemotePromise promise = (RemotePromise) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); + RemotePromise promise = (RemotePromise) asyncService.executeRunnable(createTaskParameters(task)); RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -767,9 +765,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RExecutorFuture submitAsync(Runnable task) { check(task); - ClassBody classBody = getClassBody(task); - byte[] state = encode(task); - RemotePromise result = (RemotePromise) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); + RemotePromise result = (RemotePromise) asyncService.executeRunnable(createTaskParameters(task)); addListener(result); return createFuture(result); } diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index d87bd40cd..f605b589a 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -33,7 +33,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.redisson.api.LocalCachedMapOptions; -import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; import org.redisson.api.LocalCachedMapOptions.SyncStrategy; import org.redisson.api.RFuture; @@ -41,16 +40,12 @@ import org.redisson.api.RLocalCachedMap; import org.redisson.api.RedissonClient; import org.redisson.cache.Cache; import org.redisson.cache.CacheKey; -import org.redisson.cache.LFUCacheMap; -import org.redisson.cache.LRUCacheMap; import org.redisson.cache.LocalCacheListener; import org.redisson.cache.LocalCacheView; import org.redisson.cache.LocalCachedMapClear; import org.redisson.cache.LocalCachedMapInvalidate; import org.redisson.cache.LocalCachedMapUpdate; import org.redisson.cache.LocalCachedMessageCodec; -import org.redisson.cache.NoneCacheMap; -import org.redisson.cache.ReferenceCacheMap; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -145,13 +140,9 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } private void init(String name, LocalCachedMapOptions options, RedissonClient redisson, EvictionScheduler evictionScheduler) { - instanceId = generateId(); - syncStrategy = options.getSyncStrategy(); - cache = createCache(options); - - listener = new LocalCacheListener(name, commandExecutor, cache, this, instanceId, codec, options, cacheUpdateLogTime) { + listener = new LocalCacheListener(name, commandExecutor, this, codec, options, cacheUpdateLogTime) { @Override protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException { @@ -162,7 +153,9 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } }; - listener.add(); + cache = listener.createCache(options); + instanceId = listener.generateId(); + listener.add(cache); localCacheView = new LocalCacheView(cache, this); if (options.getSyncStrategy() != SyncStrategy.NONE) { @@ -182,25 +175,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R cache.put(cacheKey, new CacheValue(key, value)); } - protected Cache createCache(LocalCachedMapOptions options) { - if (options.getEvictionPolicy() == EvictionPolicy.NONE) { - return new NoneCacheMap(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); - } - if (options.getEvictionPolicy() == EvictionPolicy.LRU) { - return new LRUCacheMap(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); - } - if (options.getEvictionPolicy() == EvictionPolicy.LFU) { - return new LFUCacheMap(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); - } - if (options.getEvictionPolicy() == EvictionPolicy.SOFT) { - return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); - } - if (options.getEvictionPolicy() == EvictionPolicy.WEAK) { - return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); - } - throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy()); - } - public CacheKey toCacheKey(Object key) { ByteBuf encoded = encodeMapKey(key); try { @@ -259,12 +233,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } - protected static byte[] generateId() { - byte[] id = new byte[16]; - ThreadLocalRandom.current().nextBytes(id); - return id; - } - protected static byte[] generateLogEntryId(byte[] keyHash) { byte[] result = new byte[keyHash.length + 1 + 8]; result[16] = ':'; diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java index 265e5ba9d..525190880 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java @@ -23,13 +23,16 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.redisson.RedissonListMultimapCache; import org.redisson.RedissonObject; import org.redisson.RedissonScoredSortedSet; import org.redisson.RedissonTopic; +import org.redisson.RedissonLocalCachedMap.CacheValue; import org.redisson.api.LocalCachedMapOptions; +import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; import org.redisson.api.LocalCachedMapOptions.SyncStrategy; import org.redisson.api.RFuture; @@ -70,7 +73,7 @@ public abstract class LocalCacheListener { private CommandAsyncExecutor commandExecutor; private Cache cache; private RObject object; - private byte[] instanceId; + private byte[] instanceId = new byte[16]; private Codec codec; private LocalCachedMapOptions options; @@ -80,24 +83,48 @@ public abstract class LocalCacheListener { private int syncListenerId; private int reconnectionListenerId; - public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor, Cache cache, - RObject object, byte[] instanceId, Codec codec, LocalCachedMapOptions options, long cacheUpdateLogTime) { + public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor, + RObject object, Codec codec, LocalCachedMapOptions options, long cacheUpdateLogTime) { super(); this.name = name; this.commandExecutor = commandExecutor; - this.cache = cache; this.object = object; - this.instanceId = instanceId; this.codec = codec; this.options = options; this.cacheUpdateLogTime = cacheUpdateLogTime; } - + + public byte[] generateId() { + ThreadLocalRandom.current().nextBytes(instanceId); + return instanceId; + } + + public Cache createCache(LocalCachedMapOptions options) { + if (options.getEvictionPolicy() == EvictionPolicy.NONE) { + return new NoneCacheMap(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); + } + if (options.getEvictionPolicy() == EvictionPolicy.LRU) { + return new LRUCacheMap(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); + } + if (options.getEvictionPolicy() == EvictionPolicy.LFU) { + return new LFUCacheMap(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); + } + if (options.getEvictionPolicy() == EvictionPolicy.SOFT) { + return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); + } + if (options.getEvictionPolicy() == EvictionPolicy.WEAK) { + return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); + } + throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy()); + } + public boolean isDisabled(Object key) { return disabledKeys.containsKey(key); } - public void add() { + public void add(Cache cache) { + this.cache = cache; + invalidationTopic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName()); if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) { diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 8fb254935..fbd4c5a74 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -376,7 +376,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (ClusterNodeInfo clusterNodeInfo : nodes) { nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n"); } - log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); + log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); } Collection newPartitions = parsePartitions(nodes); diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 47787c094..bc7b09722 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -99,6 +99,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } List master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); + if (master.isEmpty()) { + throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!"); + } + String masterHost = createAddress(master.get(0), master.get(1)); this.config.setMasterAddress(masterHost); currentMaster.set(masterHost);