refactoring

pull/6166/head
Nikita Koksharov 7 months ago
parent 3a39e41d01
commit 1a1bd11ff7

@ -28,27 +28,27 @@ public class QueueTransferService {
private final Map<String, QueueTransferTask> tasks = new ConcurrentHashMap<>(); private final Map<String, QueueTransferTask> tasks = new ConcurrentHashMap<>();
public void schedule(String name, QueueTransferTask task) { public void schedule(String name, QueueTransferTask task) {
QueueTransferTask oldTask = tasks.putIfAbsent(name, task); tasks.compute(name, (k, t) -> {
if (oldTask == null) { if (t == null) {
task.start(); task.start();
} else { return task;
oldTask.getLock().execute(() -> { }
oldTask.incUsage(); t.incUsage();
}); return t;
} });
} }
public void remove(String name) { public void remove(String name) {
QueueTransferTask task = tasks.get(name); tasks.compute(name, (k, task) -> {
if (task == null) { if (task == null) {
return; return null;
} }
task.getLock().execute(() -> {
if (task.decUsage() == 0) { if (task.decUsage() == 0) {
tasks.remove(name, task);
task.stop(); task.stop();
return null;
} }
return task;
}); });
} }

@ -22,7 +22,6 @@ import org.redisson.api.RTopic;
import org.redisson.api.listener.BaseStatusListener; import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.connection.ServiceManager; import org.redisson.connection.ServiceManager;
import org.redisson.misc.WrappedLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -62,8 +61,7 @@ public abstract class QueueTransferTask {
private int usage = 1; private int usage = 1;
private final AtomicReference<TimeoutTask> lastTimeout = new AtomicReference<TimeoutTask>(); private final AtomicReference<TimeoutTask> lastTimeout = new AtomicReference<TimeoutTask>();
private final ServiceManager serviceManager; private final ServiceManager serviceManager;
private final WrappedLock lock = new WrappedLock();
public QueueTransferTask(ServiceManager serviceManager) { public QueueTransferTask(ServiceManager serviceManager) {
super(); super();
this.serviceManager = serviceManager; this.serviceManager = serviceManager;
@ -157,7 +155,4 @@ public abstract class QueueTransferTask {
}); });
} }
public WrappedLock getLock() {
return lock;
}
} }

Loading…
Cancel
Save