From 7ca3f05ee1a165d19cf4fcc956c37a64ca0d4a8a Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 3 May 2023 10:42:11 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/RedissonBaseAdder.java | 20 ++++++++----------- .../org/redisson/RedissonExecutorService.java | 9 +-------- .../org/redisson/RedissonLocalCachedMap.java | 2 +- .../java/org/redisson/RedissonObject.java | 7 ++++++- .../RedissonPermitExpirableSemaphore.java | 8 +------- .../org/redisson/RedissonRateLimiter.java | 4 +--- .../org/redisson/RedissonReliableTopic.java | 10 +--------- .../java/org/redisson/RedissonTimeSeries.java | 12 +++++------ .../redisson/cache/LocalCacheListener.java | 14 ++++--------- .../redisson/connection/ServiceManager.java | 10 ++++++++++ .../redisson/remote/BaseRemoteService.java | 10 +++++----- .../transaction/RedissonTransaction.java | 14 +++++-------- 12 files changed, 49 insertions(+), 71 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java index 02368f2da..cf6749ee0 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java @@ -15,7 +15,6 @@ */ package org.redisson; -import io.netty.buffer.ByteBufUtil; import org.redisson.api.RFuture; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; @@ -26,7 +25,10 @@ import org.redisson.misc.CompletableFutureWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.*; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @@ -83,12 +85,6 @@ public abstract class RedissonBaseAdder extends RedissonExpira protected abstract void doReset(); - private String generateId() { - byte[] id = new byte[16]; - ThreadLocalRandom.current().nextBytes(id); - return ByteBufUtil.hexDump(id); - } - public void reset() { get(resetAsync()); } @@ -98,7 +94,7 @@ public abstract class RedissonBaseAdder extends RedissonExpira } public RFuture sumAsync() { - String id = generateId(); + String id = getServiceManager().generateId(); RFuture future = topic.publishAsync(SUM_MSG + ":" + id); RSemaphore semaphore = getSemaphore(id); @@ -117,7 +113,7 @@ public abstract class RedissonBaseAdder extends RedissonExpira } public RFuture sumAsync(long timeout, TimeUnit timeUnit) { - String id = generateId(); + String id = getServiceManager().generateId(); RFuture future = topic.publishAsync(SUM_MSG + ":" + id); RSemaphore semaphore = getSemaphore(id); CompletionStage f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue())) @@ -140,7 +136,7 @@ public abstract class RedissonBaseAdder extends RedissonExpira } public RFuture resetAsync() { - String id = generateId(); + String id = commandExecutor.getServiceManager().generateId(); RFuture future = topic.publishAsync(CLEAR_MSG + ":" + id); RSemaphore semaphore = getSemaphore(id); CompletionStage f = future.thenCompose(r -> semaphore.acquireAsync(r.intValue())) @@ -149,7 +145,7 @@ public abstract class RedissonBaseAdder extends RedissonExpira } public RFuture resetAsync(long timeout, TimeUnit timeUnit) { - String id = generateId(); + String id = commandExecutor.getServiceManager().generateId(); RFuture future = topic.publishAsync(CLEAR_MSG + ":" + id); RSemaphore semaphore = getSemaphore(id); CompletionStage f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue())) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 5ba45604f..ca6ad9c48 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -16,7 +16,6 @@ package org.redisson; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import org.redisson.api.*; import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.Codec; @@ -172,12 +171,6 @@ public class RedissonExecutorService implements RScheduledExecutorService { idGenerator = options.getIdGenerator(); } - protected String generateActiveWorkersId() { - byte[] id = new byte[16]; - ThreadLocalRandom.current().nextBytes(id); - return ByteBufUtil.hexDump(id); - } - @Override public int getTaskCount() { return commandExecutor.get(getTaskCountAsync()); @@ -210,7 +203,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public int countActiveWorkers() { - String id = generateActiveWorkersId(); + String id = commandExecutor.getServiceManager().generateId(); int subscribers = (int) workersTopic.publish(id); if (subscribers == 0) { return 0; diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 1aa83cb08..e57019b6d 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -596,7 +596,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public RFuture deleteAsync() { cache.clear(); - ByteBuf msgEncoded = encode(new LocalCachedMapClear(instanceId, listener.generateId(), false)); + ByteBuf msgEncoded = encode(new LocalCachedMapClear(instanceId, getServiceManager().generateIdArray(), false)); return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then " + "redis.call('publish', KEYS[2], ARGV[1]); " diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index 48b78ace5..d5cd8bf4e 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -23,6 +23,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.ServiceManager; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.Hash; @@ -67,7 +68,11 @@ public abstract class RedissonObject implements RObject { } return prefix + ":{" + name + "}"; } - + + public ServiceManager getServiceManager() { + return commandExecutor.getServiceManager(); + } + public static String suffixName(String name, String suffix) { if (name.contains("{")) { return name + ":" + suffix; diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 19164b1e4..f1e336927 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -336,18 +336,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen return new CompletableFutureWrapper<>(f); } - protected byte[] generateId() { - byte[] id = new byte[16]; - ThreadLocalRandom.current().nextBytes(id); - return id; - } - private RFuture tryAcquireAsync(int permits, long timeoutDate) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } - byte[] id = generateId(); + byte[] id = getServiceManager().generateIdArray(); return commandExecutor.getServiceManager().execute(() -> { RFuture future = tryAcquireAsync(id, permits, timeoutDate); return commandExecutor.handleNoSync(future, () -> releaseAsync(ByteBufUtil.hexDump(id))); diff --git a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java index 98d724c93..d89606ac9 100644 --- a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java +++ b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java @@ -29,7 +29,6 @@ import org.redisson.misc.CompletableFutureWrapper; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** @@ -171,8 +170,7 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit } private RFuture tryAcquireAsync(RedisCommand command, Long value) { - byte[] random = new byte[16]; - ThreadLocalRandom.current().nextBytes(random); + byte[] random = getServiceManager().generateIdArray(); return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');" diff --git a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java index dd6c30703..321facaa2 100644 --- a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java @@ -15,7 +15,6 @@ */ package org.redisson; -import io.netty.buffer.ByteBufUtil; import io.netty.util.Timeout; import org.redisson.api.RFuture; import org.redisson.api.RReliableTopic; @@ -34,7 +33,6 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -142,15 +140,9 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl Arrays.asList(getRawName(), getSubscribersName()), encode(message)); } - protected String generateId() { - byte[] id = new byte[16]; - ThreadLocalRandom.current().nextBytes(id); - return ByteBufUtil.hexDump(id); - } - @Override public RFuture addListenerAsync(Class type, MessageListener listener) { - String id = generateId(); + String id = getServiceManager().generateId(); listeners.put(id, new Entry(type, listener)); if (subscriberId.get() != null) { diff --git a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java index ed19a33d9..532b167ab 100644 --- a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java +++ b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java @@ -23,7 +23,10 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.*; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.TimeSeriesEntryReplayDecoder; +import org.redisson.client.protocol.decoder.TimeSeriesFirstEntryReplayDecoder; +import org.redisson.client.protocol.decoder.TimeSeriesSingleEntryReplayDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.iterator.RedissonBaseIterator; @@ -31,7 +34,6 @@ import org.redisson.misc.CompletableFutureWrapper; import java.time.Duration; import java.util.*; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -134,8 +136,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime params.add(expirationTime); for (Map.Entry entry : objects.entrySet()) { params.add(entry.getKey()); - byte[] random = new byte[16]; - ThreadLocalRandom.current().nextBytes(random); + byte[] random = getServiceManager().generateIdArray(); params.add(random); encode(params, entry.getValue()); } @@ -193,8 +194,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime params.add(expirationTime); for (TimeSeriesEntry entry : entries) { params.add(entry.getTimestamp()); - byte[] random = new byte[16]; - ThreadLocalRandom.current().nextBytes(random); + byte[] random = getServiceManager().generateIdArray(); if (entry.getLabel() == null) { params.add(2); } else { diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java index fe11292e0..59bd08a07 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java @@ -59,7 +59,7 @@ public abstract class LocalCacheListener { private CommandAsyncExecutor commandExecutor; private Map cache; private RObject object; - private byte[] instanceId = new byte[16]; + private byte[] instanceId; private Codec codec; private LocalCachedMapOptions options; @@ -82,14 +82,8 @@ public abstract class LocalCacheListener { this.codec = codec; this.options = options; this.cacheUpdateLogTime = cacheUpdateLogTime; - - ThreadLocalRandom.current().nextBytes(instanceId); - } - - public byte[] generateId() { - byte[] id = new byte[16]; - ThreadLocalRandom.current().nextBytes(id); - return id; + + instanceId = commandExecutor.getServiceManager().generateIdArray(); } public byte[] getInstanceId() { @@ -272,7 +266,7 @@ public abstract class LocalCacheListener { return new CompletableFutureWrapper<>((Void) null); } - byte[] id = generateId(); + byte[] id = commandExecutor.getServiceManager().generateIdArray(); RFuture future = invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true)); CompletionStage f = future.thenCompose(res -> { if (res.intValue() == 0) { diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 188fc4d8c..54cf488b5 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -419,4 +419,14 @@ public class ServiceManager { }); } + public String generateId() { + return ByteBufUtil.hexDump(generateIdArray()); + } + + public byte[] generateIdArray() { + byte[] id = new byte[16]; + ThreadLocalRandom.current().nextBytes(id); + return id; + } + } diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java index 65a1944bb..39404536b 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java @@ -16,7 +16,6 @@ package org.redisson.remote; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; import io.netty.util.Timeout; @@ -40,7 +39,10 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -172,9 +174,7 @@ public abstract class BaseRemoteService { } protected String generateRequestId(Object[] args) { - byte[] id = new byte[16]; - ThreadLocalRandom.current().nextBytes(id); - return ByteBufUtil.hexDump(id); + return commandExecutor.getServiceManager().generateId(); } protected abstract CompletableFuture addAsync(String requestQueueName, RemoteServiceRequest request, diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 1946a24d0..a681fde84 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -61,12 +61,13 @@ public class RedissonTransaction implements RTransaction { private RedissonTransactionalBuckets bucketsCodecInstance; private final long startTime = System.currentTimeMillis(); - private final String id = generateId(); + private final String id; public RedissonTransaction(CommandAsyncExecutor commandExecutor, TransactionOptions options) { super(); this.options = options; this.commandExecutor = commandExecutor; + this.id = commandExecutor.getServiceManager().generateId(); } public RedissonTransaction(CommandAsyncExecutor commandExecutor, TransactionOptions options, @@ -76,6 +77,7 @@ public class RedissonTransaction implements RTransaction { this.options = options; this.operations = operations; this.localCaches = localCaches; + this.id = commandExecutor.getServiceManager().generateId(); } @Override @@ -212,7 +214,7 @@ public class RedissonTransaction implements RTransaction { transactionalOperation.commit(transactionExecutor); } - String id = generateId(); + String id = commandExecutor.getServiceManager().generateId(); CompletionStage> future = disableLocalCacheAsync(id, localCaches, operations); CompletionStage ff = future .handle((hashes, ex) -> { @@ -281,7 +283,7 @@ public class RedissonTransaction implements RTransaction { transactionalOperation.commit(transactionExecutor); } - String id = generateId(); + String id = commandExecutor.getServiceManager().generateId(); Map hashes = disableLocalCache(id, localCaches, operations); try { @@ -533,12 +535,6 @@ public class RedissonTransaction implements RTransaction { BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC)); } - protected static String generateId() { - byte[] id = new byte[16]; - ThreadLocalRandom.current().nextBytes(id); - return ByteBufUtil.hexDump(id); - } - @Override public void rollback() { rollback(operations);