refactoring

pull/5038/head
Nikita Koksharov 2 years ago
parent f13ff2b4b5
commit 7ca3f05ee1

@ -15,7 +15,6 @@
*/
package org.redisson;
import io.netty.buffer.ByteBufUtil;
import org.redisson.api.RFuture;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
@ -26,7 +25,10 @@ import org.redisson.misc.CompletableFutureWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*
@ -83,12 +85,6 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
protected abstract void doReset();
private String generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
public void reset() {
get(resetAsync());
}
@ -98,7 +94,7 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
}
public RFuture<T> sumAsync() {
String id = generateId();
String id = getServiceManager().generateId();
RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
@ -117,7 +113,7 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
}
public RFuture<T> sumAsync(long timeout, TimeUnit timeUnit) {
String id = generateId();
String id = getServiceManager().generateId();
RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
CompletionStage<T> f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue()))
@ -140,7 +136,7 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
}
public RFuture<Void> resetAsync() {
String id = generateId();
String id = commandExecutor.getServiceManager().generateId();
RFuture<Long> future = topic.publishAsync(CLEAR_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
CompletionStage<Void> f = future.thenCompose(r -> semaphore.acquireAsync(r.intValue()))
@ -149,7 +145,7 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
}
public RFuture<Void> resetAsync(long timeout, TimeUnit timeUnit) {
String id = generateId();
String id = commandExecutor.getServiceManager().generateId();
RFuture<Long> future = topic.publishAsync(CLEAR_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
CompletionStage<Void> f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue()))

@ -16,7 +16,6 @@
package org.redisson;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
@ -172,12 +171,6 @@ public class RedissonExecutorService implements RScheduledExecutorService {
idGenerator = options.getIdGenerator();
}
protected String generateActiveWorkersId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@Override
public int getTaskCount() {
return commandExecutor.get(getTaskCountAsync());
@ -210,7 +203,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public int countActiveWorkers() {
String id = generateActiveWorkersId();
String id = commandExecutor.getServiceManager().generateId();
int subscribers = (int) workersTopic.publish(id);
if (subscribers == 0) {
return 0;

@ -596,7 +596,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<Boolean> deleteAsync() {
cache.clear();
ByteBuf msgEncoded = encode(new LocalCachedMapClear(instanceId, listener.generateId(), false));
ByteBuf msgEncoded = encode(new LocalCachedMapClear(instanceId, getServiceManager().generateIdArray(), false));
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "

@ -23,6 +23,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ServiceManager;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;
@ -67,7 +68,11 @@ public abstract class RedissonObject implements RObject {
}
return prefix + ":{" + name + "}";
}
public ServiceManager getServiceManager() {
return commandExecutor.getServiceManager();
}
public static String suffixName(String name, String suffix) {
if (name.contains("{")) {
return name + ":" + suffix;

@ -336,18 +336,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return new CompletableFutureWrapper<>(f);
}
protected byte[] generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return id;
}
private RFuture<String> tryAcquireAsync(int permits, long timeoutDate) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
byte[] id = generateId();
byte[] id = getServiceManager().generateIdArray();
return commandExecutor.getServiceManager().execute(() -> {
RFuture<String> future = tryAcquireAsync(id, permits, timeoutDate);
return commandExecutor.handleNoSync(future, () -> releaseAsync(ByteBufUtil.hexDump(id)));

@ -29,7 +29,6 @@ import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
@ -171,8 +170,7 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
}
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
byte[] random = new byte[16];
ThreadLocalRandom.current().nextBytes(random);
byte[] random = getServiceManager().generateIdArray();
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"local rate = redis.call('hget', KEYS[1], 'rate');"

@ -15,7 +15,6 @@
*/
package org.redisson;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
@ -34,7 +33,6 @@ import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -142,15 +140,9 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
Arrays.asList(getRawName(), getSubscribersName()), encode(message));
}
protected String generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@Override
public <M> RFuture<String> addListenerAsync(Class<M> type, MessageListener<M> listener) {
String id = generateId();
String id = getServiceManager().generateId();
listeners.put(id, new Entry(type, listener));
if (subscriberId.get() != null) {

@ -23,7 +23,10 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.*;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.TimeSeriesEntryReplayDecoder;
import org.redisson.client.protocol.decoder.TimeSeriesFirstEntryReplayDecoder;
import org.redisson.client.protocol.decoder.TimeSeriesSingleEntryReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.iterator.RedissonBaseIterator;
@ -31,7 +34,6 @@ import org.redisson.misc.CompletableFutureWrapper;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@ -134,8 +136,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
params.add(expirationTime);
for (Map.Entry<Long, V> entry : objects.entrySet()) {
params.add(entry.getKey());
byte[] random = new byte[16];
ThreadLocalRandom.current().nextBytes(random);
byte[] random = getServiceManager().generateIdArray();
params.add(random);
encode(params, entry.getValue());
}
@ -193,8 +194,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
params.add(expirationTime);
for (TimeSeriesEntry<V, L> entry : entries) {
params.add(entry.getTimestamp());
byte[] random = new byte[16];
ThreadLocalRandom.current().nextBytes(random);
byte[] random = getServiceManager().generateIdArray();
if (entry.getLabel() == null) {
params.add(2);
} else {

@ -59,7 +59,7 @@ public abstract class LocalCacheListener {
private CommandAsyncExecutor commandExecutor;
private Map<CacheKey, ? extends CacheValue> cache;
private RObject object;
private byte[] instanceId = new byte[16];
private byte[] instanceId;
private Codec codec;
private LocalCachedMapOptions<?, ?> options;
@ -82,14 +82,8 @@ public abstract class LocalCacheListener {
this.codec = codec;
this.options = options;
this.cacheUpdateLogTime = cacheUpdateLogTime;
ThreadLocalRandom.current().nextBytes(instanceId);
}
public byte[] generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return id;
instanceId = commandExecutor.getServiceManager().generateIdArray();
}
public byte[] getInstanceId() {
@ -272,7 +266,7 @@ public abstract class LocalCacheListener {
return new CompletableFutureWrapper<>((Void) null);
}
byte[] id = generateId();
byte[] id = commandExecutor.getServiceManager().generateIdArray();
RFuture<Long> future = invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true));
CompletionStage<Void> f = future.thenCompose(res -> {
if (res.intValue() == 0) {

@ -419,4 +419,14 @@ public class ServiceManager {
});
}
public String generateId() {
return ByteBufUtil.hexDump(generateIdArray());
}
public byte[] generateIdArray() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return id;
}
}

@ -16,7 +16,6 @@
package org.redisson.remote;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import io.netty.util.Timeout;
@ -40,7 +39,10 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -172,9 +174,7 @@ public abstract class BaseRemoteService {
}
protected String generateRequestId(Object[] args) {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
return commandExecutor.getServiceManager().generateId();
}
protected abstract CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request,

@ -61,12 +61,13 @@ public class RedissonTransaction implements RTransaction {
private RedissonTransactionalBuckets bucketsCodecInstance;
private final long startTime = System.currentTimeMillis();
private final String id = generateId();
private final String id;
public RedissonTransaction(CommandAsyncExecutor commandExecutor, TransactionOptions options) {
super();
this.options = options;
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getServiceManager().generateId();
}
public RedissonTransaction(CommandAsyncExecutor commandExecutor, TransactionOptions options,
@ -76,6 +77,7 @@ public class RedissonTransaction implements RTransaction {
this.options = options;
this.operations = operations;
this.localCaches = localCaches;
this.id = commandExecutor.getServiceManager().generateId();
}
@Override
@ -212,7 +214,7 @@ public class RedissonTransaction implements RTransaction {
transactionalOperation.commit(transactionExecutor);
}
String id = generateId();
String id = commandExecutor.getServiceManager().generateId();
CompletionStage<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id, localCaches, operations);
CompletionStage<Void> ff = future
.handle((hashes, ex) -> {
@ -281,7 +283,7 @@ public class RedissonTransaction implements RTransaction {
transactionalOperation.commit(transactionExecutor);
}
String id = generateId();
String id = commandExecutor.getServiceManager().generateId();
Map<HashKey, HashValue> hashes = disableLocalCache(id, localCaches, operations);
try {
@ -533,12 +535,6 @@ public class RedissonTransaction implements RTransaction {
BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC));
}
protected static String generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@Override
public void rollback() {
rollback(operations);

Loading…
Cancel
Save