refactoring

pull/2053/head
Nikita Koksharov 6 years ago
parent caaeaf591b
commit ec5873bb1b

@ -321,10 +321,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public void execute(Runnable task) { public void execute(Runnable task) {
check(task); check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncServiceWithoutResult.executeRunnable( RemotePromise<Void> promise = (RemotePromise<Void>) asyncServiceWithoutResult.executeRunnable(
new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state)); createTaskParameters(task));
syncExecute(promise); syncExecute(promise);
} }
@ -338,9 +336,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
for (Runnable task : tasks) { for (Runnable task : tasks) {
check(task); check(task);
ClassBody classBody = getClassBody(task); asyncServiceWithoutResult.executeRunnable(createTaskParameters(task));
byte[] state = encode(task);
asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
} }
List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd(); List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd();
@ -567,9 +563,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public <T> RExecutorFuture<T> submitAsync(Callable<T> task) { public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
check(task); check(task);
ClassBody classBody = getClassBody(task); RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(createTaskParameters(task));
byte[] state = encode(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
addListener(result); addListener(result);
return createFuture(result); return createFuture(result);
} }
@ -585,9 +579,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Callable<?> task : tasks) { for (Callable<?> task : tasks) {
check(task); check(task);
ClassBody classBody = getClassBody(task); RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(createTaskParameters(task));
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise); RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture); result.add(executorFuture);
} }
@ -600,6 +592,18 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return new RedissonExecutorBatchFuture(result); return new RedissonExecutorBatchFuture(result);
} }
protected TaskParameters createTaskParameters(Callable<?> task) {
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state);
}
protected TaskParameters createTaskParameters(Runnable task) {
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state);
}
@Override @Override
public RExecutorBatchFuture submitAsync(Callable<?>...tasks) { public RExecutorBatchFuture submitAsync(Callable<?>...tasks) {
if (tasks.length == 0) { if (tasks.length == 0) {
@ -611,9 +615,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<>(); List<RExecutorFuture<?>> result = new ArrayList<>();
for (Callable<?> task : tasks) { for (Callable<?> task : tasks) {
check(task); check(task);
ClassBody classBody = getClassBody(task); RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(createTaskParameters(task));
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise); RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture); result.add(executorFuture);
} }
@ -701,9 +703,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Runnable task : tasks) { for (Runnable task : tasks) {
check(task); check(task);
ClassBody classBody = getClassBody(task); RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise); RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture); result.add(executorFuture);
} }
@ -727,9 +727,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<>(); List<RExecutorFuture<?>> result = new ArrayList<>();
for (Runnable task : tasks) { for (Runnable task : tasks) {
check(task); check(task);
ClassBody classBody = getClassBody(task); RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise); RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture); result.add(executorFuture);
} }
@ -767,9 +765,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override @Override
public RExecutorFuture<?> submitAsync(Runnable task) { public RExecutorFuture<?> submitAsync(Runnable task) {
check(task); check(task);
ClassBody classBody = getClassBody(task); RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
byte[] state = encode(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
addListener(result); addListener(result);
return createFuture(result); return createFuture(result);
} }

@ -33,7 +33,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy; import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -41,16 +40,12 @@ import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.cache.Cache; import org.redisson.cache.Cache;
import org.redisson.cache.CacheKey; import org.redisson.cache.CacheKey;
import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.LocalCacheListener; import org.redisson.cache.LocalCacheListener;
import org.redisson.cache.LocalCacheView; import org.redisson.cache.LocalCacheView;
import org.redisson.cache.LocalCachedMapClear; import org.redisson.cache.LocalCachedMapClear;
import org.redisson.cache.LocalCachedMapInvalidate; import org.redisson.cache.LocalCachedMapInvalidate;
import org.redisson.cache.LocalCachedMapUpdate; import org.redisson.cache.LocalCachedMapUpdate;
import org.redisson.cache.LocalCachedMessageCodec; import org.redisson.cache.LocalCachedMessageCodec;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
@ -145,13 +140,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
private void init(String name, LocalCachedMapOptions<K, V> options, RedissonClient redisson, EvictionScheduler evictionScheduler) { private void init(String name, LocalCachedMapOptions<K, V> options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
instanceId = generateId();
syncStrategy = options.getSyncStrategy(); syncStrategy = options.getSyncStrategy();
cache = createCache(options); listener = new LocalCacheListener(name, commandExecutor, this, codec, options, cacheUpdateLogTime) {
listener = new LocalCacheListener(name, commandExecutor, cache, this, instanceId, codec, options, cacheUpdateLogTime) {
@Override @Override
protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException { protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException {
@ -162,7 +153,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
}; };
listener.add(); cache = listener.createCache(options);
instanceId = listener.generateId();
listener.add(cache);
localCacheView = new LocalCacheView(cache, this); localCacheView = new LocalCacheView(cache, this);
if (options.getSyncStrategy() != SyncStrategy.NONE) { if (options.getSyncStrategy() != SyncStrategy.NONE) {
@ -182,25 +175,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
cache.put(cacheKey, new CacheValue(key, value)); cache.put(cacheKey, new CacheValue(key, value));
} }
protected Cache<CacheKey, CacheValue> createCache(LocalCachedMapOptions<K, V> options) {
if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
return new NoneCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
return new LRUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
return new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.SOFT) {
return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.WEAK) {
return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy());
}
public CacheKey toCacheKey(Object key) { public CacheKey toCacheKey(Object key) {
ByteBuf encoded = encodeMapKey(key); ByteBuf encoded = encodeMapKey(key);
try { try {
@ -259,12 +233,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future; return future;
} }
protected static byte[] generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return id;
}
protected static byte[] generateLogEntryId(byte[] keyHash) { protected static byte[] generateLogEntryId(byte[] keyHash) {
byte[] result = new byte[keyHash.length + 1 + 8]; byte[] result = new byte[keyHash.length + 1 + 8];
result[16] = ':'; result[16] = ':';

@ -23,13 +23,16 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.RedissonListMultimapCache; import org.redisson.RedissonListMultimapCache;
import org.redisson.RedissonObject; import org.redisson.RedissonObject;
import org.redisson.RedissonScoredSortedSet; import org.redisson.RedissonScoredSortedSet;
import org.redisson.RedissonTopic; import org.redisson.RedissonTopic;
import org.redisson.RedissonLocalCachedMap.CacheValue;
import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy; import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -70,7 +73,7 @@ public abstract class LocalCacheListener {
private CommandAsyncExecutor commandExecutor; private CommandAsyncExecutor commandExecutor;
private Cache<?, ?> cache; private Cache<?, ?> cache;
private RObject object; private RObject object;
private byte[] instanceId; private byte[] instanceId = new byte[16];
private Codec codec; private Codec codec;
private LocalCachedMapOptions<?, ?> options; private LocalCachedMapOptions<?, ?> options;
@ -80,24 +83,48 @@ public abstract class LocalCacheListener {
private int syncListenerId; private int syncListenerId;
private int reconnectionListenerId; private int reconnectionListenerId;
public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor, Cache<?, ?> cache, public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor,
RObject object, byte[] instanceId, Codec codec, LocalCachedMapOptions<?, ?> options, long cacheUpdateLogTime) { RObject object, Codec codec, LocalCachedMapOptions<?, ?> options, long cacheUpdateLogTime) {
super(); super();
this.name = name; this.name = name;
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.cache = cache;
this.object = object; this.object = object;
this.instanceId = instanceId;
this.codec = codec; this.codec = codec;
this.options = options; this.options = options;
this.cacheUpdateLogTime = cacheUpdateLogTime; this.cacheUpdateLogTime = cacheUpdateLogTime;
} }
public byte[] generateId() {
ThreadLocalRandom.current().nextBytes(instanceId);
return instanceId;
}
public Cache<CacheKey, CacheValue> createCache(LocalCachedMapOptions<?, ?> options) {
if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
return new NoneCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
return new LRUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
return new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.SOFT) {
return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.WEAK) {
return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy());
}
public boolean isDisabled(Object key) { public boolean isDisabled(Object key) {
return disabledKeys.containsKey(key); return disabledKeys.containsKey(key);
} }
public void add() { public void add(Cache<?, ?> cache) {
this.cache = cache;
invalidationTopic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName()); invalidationTopic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) { if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {

@ -376,7 +376,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (ClusterNodeInfo clusterNodeInfo : nodes) { for (ClusterNodeInfo clusterNodeInfo : nodes) {
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n"); nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
} }
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
} }
Collection<ClusterPartition> newPartitions = parsePartitions(nodes); Collection<ClusterPartition> newPartitions = parsePartitions(nodes);

@ -99,6 +99,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
if (master.isEmpty()) {
throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");
}
String masterHost = createAddress(master.get(0), master.get(1)); String masterHost = createAddress(master.get(0), master.get(1));
this.config.setMasterAddress(masterHost); this.config.setMasterAddress(masterHost);
currentMaster.set(masterHost); currentMaster.set(masterHost);

Loading…
Cancel
Save