refactoring

pull/4879/head
Nikita Koksharov 2 years ago
parent 5f766b4a55
commit ac89d45c90

@ -23,6 +23,7 @@ import org.redisson.api.RTopic;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,11 +62,11 @@ public abstract class QueueTransferTask {
private int usage = 1;
private final AtomicReference<TimeoutTask> lastTimeout = new AtomicReference<TimeoutTask>();
private final ConnectionManager connectionManager;
private final ServiceManager serviceManager;
public QueueTransferTask(ConnectionManager connectionManager) {
public QueueTransferTask(ServiceManager serviceManager) {
super();
this.connectionManager = connectionManager;
this.serviceManager = serviceManager;
}
public void incUsage() {
@ -115,7 +116,7 @@ public abstract class QueueTransferTask {
long delay = startTime - System.currentTimeMillis();
if (delay > 10) {
Timeout timeout = connectionManager.getServiceManager().newTimeout(new TimerTask() {
Timeout timeout = serviceManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
pushTask();

@ -52,7 +52,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
queueName = prefixName("redisson_delay_queue", getRawName());
timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
QueueTransferTask task = new QueueTransferTask(commandExecutor.getServiceManager()) {
@Override
protected RFuture<Long> pushTaskAsync() {

@ -58,7 +58,6 @@ public class RedissonExecutorService implements RScheduledExecutorService {
public static final int TERMINATED_STATE = 2;
private final CommandAsyncExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Codec codec;
private final Redisson redisson;
@ -107,16 +106,15 @@ public class RedissonExecutorService implements RScheduledExecutorService {
super();
this.codec = codec;
this.commandExecutor = commandExecutor;
this.connectionManager = commandExecutor.getConnectionManager();
this.name = commandExecutor.getServiceManager().getConfig().getNameMapper().map(name);
this.redisson = redisson;
this.queueTransferService = queueTransferService;
this.responses = responses;
if (codec == connectionManager.getServiceManager().getCfg().getCodec()) {
this.executorId = connectionManager.getServiceManager().getId();
if (codec == commandExecutor.getServiceManager().getCfg().getCodec()) {
this.executorId = commandExecutor.getServiceManager().getId();
} else {
this.executorId = connectionManager.getServiceManager().getId() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
this.executorId = commandExecutor.getServiceManager().getId() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}
remoteService = new RedissonExecutorRemoteService(codec, name, commandExecutor, executorId, responses);
@ -242,7 +240,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
throw new IllegalArgumentException("workers amount can't be zero");
}
QueueTransferTask task = new QueueTransferTask(connectionManager) {
QueueTransferTask task = new QueueTransferTask(commandExecutor.getServiceManager()) {
@Override
protected RTopic getTopic() {
return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, schedulerChannelName);

Loading…
Cancel
Save