refactoring

pull/3487/head
Nikita Koksharov 4 years ago
parent f9e8226404
commit 63fe4d5620

@ -494,7 +494,7 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}

@ -547,7 +547,7 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}

@ -62,6 +62,9 @@ public class Redisson implements RedissonClient {
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
if (config.isReferenceEnabled()) {
connectionManager.getCommandExecutor().enableRedissonReferenceSupport(this);
}
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
writeBehindService = new WriteBehindService(connectionManager.getCommandExecutor());
}
@ -101,11 +104,7 @@ public class Redisson implements RedissonClient {
* @return Redisson instance
*/
public static RedissonClient create(Config config) {
Redisson redisson = new Redisson(config);
if (config.isReferenceEnabled()) {
redisson.enableRedissonReferenceSupport();
}
return redisson;
return new Redisson(config);
}
/**
@ -126,11 +125,7 @@ public class Redisson implements RedissonClient {
* @return Redisson instance
*/
public static RedissonRxClient createRx(Config config) {
RedissonRx react = new RedissonRx(config);
if (config.isReferenceEnabled()) {
react.enableRedissonReferenceSupport();
}
return react;
return new RedissonRx(config);
}
@ -152,11 +147,7 @@ public class Redisson implements RedissonClient {
* @return Redisson instance
*/
public static RedissonReactiveClient createReactive(Config config) {
RedissonReactive react = new RedissonReactive(config);
if (config.isReferenceEnabled()) {
react.enableRedissonReferenceSupport();
}
return react;
return new RedissonReactive(config);
}
@Override
@ -644,11 +635,7 @@ public class Redisson implements RedissonClient {
@Override
public RBatch createBatch(BatchOptions options) {
RedissonBatch batch = new RedissonBatch(evictionScheduler, connectionManager, options);
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
return new RedissonBatch(evictionScheduler, connectionManager, options);
}
@Override
@ -729,10 +716,6 @@ public class Redisson implements RedissonClient {
return connectionManager.isShuttingDown();
}
protected void enableRedissonReferenceSupport() {
this.connectionManager.getCommandExecutor().enableRedissonReferenceSupport(this);
}
@Override
public <V> RPriorityQueue<V> getPriorityQueue(String name) {
return new RedissonPriorityQueue<V>(connectionManager.getCommandExecutor(), name, this);

@ -235,7 +235,7 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
BatchOptions options = BatchOptions.defaults()
.syncSlaves(entry.getAvailableSlaves(), 1, TimeUnit.SECONDS);
return new CommandBatchService(commandExecutor.getConnectionManager(), options);
return new CommandBatchService(commandExecutor, options);
}
protected void acquireFailed(long waitTime, TimeUnit unit, long threadId) {

@ -31,12 +31,10 @@ public class RedissonBatch implements RBatch {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;
private final BatchOptions options;
public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, BatchOptions options) {
this.executorService = new CommandBatchService(connectionManager, options);
this.executorService = new CommandBatchService(connectionManager.getCommandExecutor(), options);
this.evictionScheduler = evictionScheduler;
this.options = options;
}
@Override
@ -264,10 +262,6 @@ public class RedissonBatch implements RBatch {
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, executorService, name);
}
protected void enableRedissonReferenceSupport(Redisson redisson) {
this.executorService.enableRedissonReferenceSupport(redisson);
}
@Override
public <K, V> RStreamAsync<K, V> getStream(String name) {
return new RedissonStream<K, V>(executorService, name);

@ -341,7 +341,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
@Override
public RFuture<Void> setAsync(long fromIndex, long toIndex, boolean value) {
int val = toInt(value);
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor);
for (long i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.SETBIT_VOID, getName(), i, val);
}

@ -114,7 +114,7 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
long[] indexes = hash(hashes[0], hashes[1], hashIterations, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor);
addConfigCheck(hashIterations, size, executorService);
RBitSetAsync bs = createBitSet(executorService);
for (int i = 0; i < indexes.length; i++) {
@ -165,7 +165,7 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
long[] indexes = hash(hashes[0], hashes[1], hashIterations, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor);
addConfigCheck(hashIterations, size, executorService);
RBitSetAsync bs = createBitSet(executorService);
for (int i = 0; i < indexes.length; i++) {
@ -203,7 +203,7 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override
public long count() {
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor);
RFuture<Map<String, String>> configFuture = executorService.readAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), configName);
RBitSetAsync bs = createBitSet(executorService);
@ -265,7 +265,7 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
}
hashIterations = optimalNumOfHashFunctions(expectedInsertions, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor);
executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +

@ -175,8 +175,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
@Override
public <T> List<T> persist(T... detachedObjects) {
CommandBatchService commandExecutor = new CommandBatchService(connectionManager);
commandExecutor.setObjectBuilder(connectionManager.getCommandExecutor().getObjectBuilder());
CommandBatchService commandExecutor = new CommandBatchService(connectionManager.getCommandExecutor());
Map<Class<?>, Class<?>> classCache = new HashMap<>();
Map<T, Object> detached2Attached = new LinkedHashMap<>();
@ -192,7 +191,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
name2id.put(liveMap.getName(), id);
}
CommandBatchService checkExecutor = new CommandBatchService(connectionManager);
CommandBatchService checkExecutor = new CommandBatchService(connectionManager.getCommandExecutor());
for (Entry<String, Object> entry : name2id.entrySet()) {
RMap<String, Object> map = new RedissonMap<>(checkExecutor, entry.getKey(), null, null, null);
map.containsKeyAsync("redisson_live_object");
@ -562,7 +561,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
@Override
public <T> long delete(Class<T> entityClass, Object... ids) {
CommandBatchService ce = new CommandBatchService(connectionManager);
CommandBatchService ce = new CommandBatchService(connectionManager.getCommandExecutor());
FieldList<InDefinedShape> fields = Introspectior.getFieldsWithAnnotation(entityClass.getSuperclass(), RIndex.class);
Set<String> fieldNames = fields.stream().map(f -> f.getName()).collect(Collectors.toSet());

@ -51,6 +51,10 @@ public class RedissonReactive implements RedissonReactiveClient {
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
if (config.isReferenceEnabled()) {
this.connectionManager.getCommandExecutor().enableRedissonReferenceSupport(this);
}
commandExecutor = new CommandReactiveService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
@ -465,11 +469,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBatchReactive createBatch(BatchOptions options) {
RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager, commandExecutor, options);
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
return new RedissonBatchReactive(evictionScheduler, connectionManager, commandExecutor, options);
}
@Override
@ -515,10 +515,6 @@ public class RedissonReactive implements RedissonReactiveClient {
return connectionManager.isShuttingDown();
}
protected void enableRedissonReferenceSupport() {
this.commandExecutor.enableRedissonReferenceSupport(this);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
RMapCache<K, V> map = new RedissonMapCache<>(codec, evictionScheduler, commandExecutor, name, null, options, writeBehindService);

@ -49,6 +49,9 @@ public class RedissonRx implements RedissonRxClient {
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
if (config.isReferenceEnabled()) {
connectionManager.getCommandExecutor().enableRedissonReferenceSupport(this);
}
commandExecutor = new CommandRxService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
@ -475,11 +478,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RBatchRx createBatch(BatchOptions options) {
RedissonBatchRx batch = new RedissonBatchRx(evictionScheduler, connectionManager, commandExecutor, options);
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
return new RedissonBatchRx(evictionScheduler, connectionManager, commandExecutor, options);
}
@Override
@ -520,10 +519,6 @@ public class RedissonRx implements RedissonRxClient {
return connectionManager.isShuttingDown();
}
protected void enableRedissonReferenceSupport() {
this.commandExecutor.enableRedissonReferenceSupport(this);
}
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, options, writeBehindService);

@ -43,11 +43,11 @@ public interface CommandAsyncExecutor {
ConnectionManager getConnectionManager();
CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson);
void enableRedissonReferenceSupport(RedissonClient redisson);
CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive);
void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive);
CommandAsyncExecutor enableRedissonReferenceSupport(RedissonRxClient redissonReactive);
void enableRedissonReferenceSupport(RedissonRxClient redissonReactive);
<V> RedisException convertException(RFuture<V> future);

@ -77,21 +77,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson) {
public void enableRedissonReferenceSupport(RedissonClient redisson) {
enableRedissonReferenceSupport(redisson.getConfig(), redisson, null, null);
return this;
}
@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
public void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
enableRedissonReferenceSupport(redissonReactive.getConfig(), null, redissonReactive, null);
return this;
}
@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonRxClient redissonRx) {
public void enableRedissonReferenceSupport(RedissonRxClient redissonRx) {
enableRedissonReferenceSupport(redissonRx.getConfig(), null, null, redissonRx);
return this;
}
private void enableRedissonReferenceSupport(Config config, RedissonClient redisson, RedissonReactiveClient redissonReactive, RedissonRxClient redissonRx) {
@ -664,7 +661,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (this instanceof CommandBatchService) {
executorService = (CommandBatchService) this;
} else {
executorService = new CommandBatchService(connectionManager);
executorService = new CommandBatchService(this);
}
for (String key : entry.getValue()) {

@ -103,30 +103,35 @@ public class CommandBatchService extends CommandAsyncService {
}
private final AsyncCountDownLatch latch = new AsyncCountDownLatch();
private AtomicInteger index = new AtomicInteger();
private final AtomicInteger index = new AtomicInteger();
private ConcurrentMap<MasterSlaveEntry, Entry> commands = new ConcurrentHashMap<>();
private ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections = new ConcurrentHashMap<>();
private final ConcurrentMap<MasterSlaveEntry, Entry> commands = new ConcurrentHashMap<>();
private final ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections = new ConcurrentHashMap<>();
private BatchOptions options = BatchOptions.defaults();
private final BatchOptions options;
private Map<RFuture<?>, List<CommandBatchService>> nestedServices = new ConcurrentHashMap<>();
private final Map<RFuture<?>, List<CommandBatchService>> nestedServices = new ConcurrentHashMap<>();
private AtomicBoolean executed = new AtomicBoolean();
private final AtomicBoolean executed = new AtomicBoolean();
public CommandBatchService(ConnectionManager connectionManager) {
super(connectionManager);
public CommandBatchService(CommandAsyncExecutor executor) {
this(executor.getConnectionManager(), BatchOptions.defaults(), executor.getObjectBuilder());
}
public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) {
this(executor.getConnectionManager(), options, executor.getObjectBuilder());
}
public CommandBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
this.options = options;
this(connectionManager.getCommandExecutor(), options);
}
public void setObjectBuilder(RedissonObjectBuilder objectBuilder) {
public CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder) {
super(connectionManager);
this.options = options;
this.objectBuilder = objectBuilder;
}
public BatchOptions getOptions() {
return options;
}

@ -36,7 +36,7 @@ public class TasksBatchService extends TasksService {
public TasksBatchService(Codec codec, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
batchCommandService = new CommandBatchService(commandExecutor.getConnectionManager());
batchCommandService = new CommandBatchService(commandExecutor);
}
@Override

@ -188,7 +188,7 @@ public class AccessorInterceptor {
if (commandExecutor instanceof CommandBatchService) {
ce = (CommandBatchService) commandExecutor;
} else {
ce = new CommandBatchService(connectionManager);
ce = new CommandBatchService(commandExecutor);
}
if (Number.class.isAssignableFrom(field.getType()) || PRIMITIVE_CLASSES.contains(field.getType())) {
@ -246,7 +246,7 @@ public class AccessorInterceptor {
ce = (CommandBatchService) commandExecutor;
skipExecution = true;
} else {
ce = new CommandBatchService(connectionManager);
ce = new CommandBatchService(commandExecutor);
}
if (arg instanceof Number) {

@ -123,7 +123,7 @@ public class LiveObjectInterceptor {
if (commandExecutor instanceof CommandBatchService) {
ce = (CommandBatchService) commandExecutor;
} else {
ce = new CommandBatchService(connectionManager);
ce = new CommandBatchService(commandExecutor);
}
Object idd = ((RLiveObject) me).getLiveObjectId();

@ -42,7 +42,7 @@ public class CommandReactiveBatchService extends CommandReactiveService {
public CommandReactiveBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager, options);
batchService = new CommandBatchService(connectionManager.getCommandExecutor(), options);
}
@Override
@ -80,10 +80,4 @@ public class CommandReactiveBatchService extends CommandReactiveService {
return batchService.executeAsync();
}
@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
batchService.enableRedissonReferenceSupport(redissonReactive);
return super.enableRedissonReferenceSupport(redissonReactive);
}
}

@ -33,6 +33,7 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
public CommandReactiveService(ConnectionManager connectionManager) {
super(connectionManager);
objectBuilder = connectionManager.getCommandExecutor().getObjectBuilder();
}
@Override

@ -233,10 +233,6 @@ public class RedissonBatchReactive implements RBatchReactive {
return commandExecutor.reactive(() -> executorService.executeAsync());
}
public void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
this.executorService.enableRedissonReferenceSupport(redissonReactive);
}
@Override
public <V> RGeoReactive<V> getGeo(String name) {
return ReactiveProxyBuilder.create(executorService, new RedissonGeo<V>(executorService, name, null),

@ -42,7 +42,7 @@ public class CommandRxBatchService extends CommandRxService {
public CommandRxBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager, options);
batchService = new CommandBatchService(connectionManager.getCommandExecutor(), options);
}
@Override
@ -80,10 +80,4 @@ public class CommandRxBatchService extends CommandRxService {
return batchService.executeAsync();
}
@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonRxClient redissonReactive) {
batchService.enableRedissonReferenceSupport(redissonReactive);
return super.enableRedissonReferenceSupport(redissonReactive);
}
}

@ -35,6 +35,7 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
public CommandRxService(ConnectionManager connectionManager) {
super(connectionManager);
objectBuilder = connectionManager.getCommandExecutor().getObjectBuilder();
}
@Override

@ -243,10 +243,6 @@ public class RedissonBatchRx implements RBatchRx {
return commandExecutor.flowable(() -> executorService.executeAsync()).singleElement();
}
public void enableRedissonReferenceSupport(RedissonRxClient redissonRx) {
this.executorService.enableRedissonReferenceSupport(redissonRx);
}
@Override
public <V> RGeoRx<V> getGeo(String name) {
RedissonGeo<V> geo = new RedissonGeo<V>(executorService, name, null);

@ -201,7 +201,7 @@ public class RedissonTransaction implements RTransaction {
BatchOptions batchOptions = createOptions();
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager(), batchOptions);
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor, batchOptions);
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.commit(transactionExecutor);
}
@ -265,7 +265,7 @@ public class RedissonTransaction implements RTransaction {
BatchOptions batchOptions = createOptions();
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager(), batchOptions);
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor, batchOptions);
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.commit(transactionExecutor);
}
@ -558,7 +558,7 @@ public class RedissonTransaction implements RTransaction {
public void rollback(List<TransactionalOperation> operations) {
checkState();
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor);
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.rollback(executorService);
}
@ -577,7 +577,7 @@ public class RedissonTransaction implements RTransaction {
public RFuture<Void> rollbackAsync() {
checkState();
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor);
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.rollback(executorService);
}

Loading…
Cancel
Save