Feature - RMap write-behind queue should be fault-tolerant #2388

pull/2400/head
Nikita Koksharov 5 years ago
parent 94d350c744
commit 758acbe5f3

@ -20,12 +20,12 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.MapOptions;
import org.redisson.api.RFuture;
import org.redisson.api.RQueue;
import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,52 +39,57 @@ public class MapWriteBehindTask {
private static final Logger log = LoggerFactory.getLogger(MapWriteBehindTask.class);
private final AtomicBoolean isScheduled = new AtomicBoolean();
private final Queue<MapWriterTask> writeBehindTasks = new ConcurrentLinkedQueue<>();
private final AtomicBoolean isStarted = new AtomicBoolean();
private final RQueue<MapWriterTask> writeBehindTasks;
private final CommandAsyncExecutor commandExecutor;
private final MapOptions<Object, Object> options;
public MapWriteBehindTask(CommandAsyncExecutor commandExecutor, MapOptions<?, ?> options) {
public MapWriteBehindTask(String name, CommandAsyncExecutor commandExecutor, MapOptions<?, ?> options) {
super();
this.commandExecutor = commandExecutor;
this.options = (MapOptions<Object, Object>) options;
String queueName = RedissonObject.suffixName(name, "write-behind-queue");
this.writeBehindTasks = new RedissonQueue<>(commandExecutor, queueName, null);
}
private void enqueueTask() {
if (!isScheduled.compareAndSet(false, true)) {
public void start() {
if (!isStarted.compareAndSet(false, true)) {
return;
}
commandExecutor.getConnectionManager().newTimeout(t -> {
enqueueTask();
}
private void pollTask(Map<Object, Object> addedMap, List<Object> deletedKeys) {
RFuture<MapWriterTask> future = writeBehindTasks.pollAsync();
future.onComplete((task, e) -> {
if (e != null) {
log.error(e.getMessage(), e);
return;
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
Map<Object, Object> addedMap = new LinkedHashMap<>();
List<Object> deletedKeys = new ArrayList<>();
try {
while (true) {
MapWriterTask task = writeBehindTasks.poll();
if (task == null) {
break;
}
if (task instanceof MapWriterTask.Remove) {
for (Object key : task.getKeys()) {
deletedKeys.add(key);
if (deletedKeys.size() == options.getWriteBehindBatchSize()) {
options.getWriter().delete(deletedKeys);
deletedKeys.clear();
}
if (task != null) {
if (task instanceof MapWriterTask.Remove) {
for (Object key : task.getKeys()) {
deletedKeys.add(key);
if (deletedKeys.size() == options.getWriteBehindBatchSize()) {
options.getWriter().delete(deletedKeys);
deletedKeys.clear();
}
} else {
for (Entry<Object, Object> entry : task.getMap().entrySet()) {
addedMap.put(entry.getKey(), entry.getValue());
if (addedMap.size() == options.getWriteBehindBatchSize()) {
options.getWriter().write(addedMap);
addedMap.clear();
}
}
} else {
for (Entry<Object, Object> entry : task.getMap().entrySet()) {
addedMap.put(entry.getKey(), entry.getValue());
if (addedMap.size() == options.getWriteBehindBatchSize()) {
options.getWriter().write(addedMap);
addedMap.clear();
}
}
}
pollTask(addedMap, deletedKeys);
} else {
if (!deletedKeys.isEmpty()) {
options.getWriter().delete(deletedKeys);
deletedKeys.clear();
@ -93,22 +98,30 @@ public class MapWriteBehindTask {
options.getWriter().write(addedMap);
addedMap.clear();
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
isScheduled.set(false);
if (!writeBehindTasks.isEmpty()) {
enqueueTask();
}
});
});
}
private void enqueueTask() {
if (!isStarted.get()) {
return;
}
commandExecutor.getConnectionManager().newTimeout(t -> {
Map<Object, Object> addedMap = new LinkedHashMap<>();
List<Object> deletedKeys = new ArrayList<>();
pollTask(addedMap, deletedKeys);
}, options.getWriteBehindDelay(), TimeUnit.MILLISECONDS);
}
public void addTask(MapWriterTask task) {
writeBehindTasks.add(task);
}
enqueueTask();
public void stop() {
isStarted.set(false);
}
}

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@ -24,10 +25,13 @@ import java.util.Map;
* @author Nikita Koksharov
*
*/
public class MapWriterTask {
public class MapWriterTask implements Serializable {
public static class Remove extends MapWriterTask {
public Remove() {
}
public Remove(Collection<?> keys) {
super(keys);
}

@ -41,16 +41,18 @@ public class WriteBehindService {
return task;
}
task = new MapWriteBehindTask(executor, options);
task = new MapWriteBehindTask(name, executor, options);
MapWriteBehindTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask != null) {
task = prevTask;
}
task.start();
return task;
}
public void stop(String name) {
tasks.remove(name);
MapWriteBehindTask task = tasks.remove(name);
task.stop();
}
}

Loading…
Cancel
Save