From 758acbe5f313a9c4060a219789aac5689638ec3e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 31 Oct 2019 12:53:19 +0300 Subject: [PATCH] Feature - RMap write-behind queue should be fault-tolerant #2388 --- .../java/org/redisson/MapWriteBehindTask.java | 93 +++++++++++-------- .../main/java/org/redisson/MapWriterTask.java | 6 +- .../java/org/redisson/WriteBehindService.java | 6 +- 3 files changed, 62 insertions(+), 43 deletions(-) diff --git a/redisson/src/main/java/org/redisson/MapWriteBehindTask.java b/redisson/src/main/java/org/redisson/MapWriteBehindTask.java index a869ff175..a053e4ce6 100644 --- a/redisson/src/main/java/org/redisson/MapWriteBehindTask.java +++ b/redisson/src/main/java/org/redisson/MapWriteBehindTask.java @@ -20,12 +20,12 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.api.MapOptions; +import org.redisson.api.RFuture; +import org.redisson.api.RQueue; import org.redisson.command.CommandAsyncExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,52 +39,57 @@ public class MapWriteBehindTask { private static final Logger log = LoggerFactory.getLogger(MapWriteBehindTask.class); - private final AtomicBoolean isScheduled = new AtomicBoolean(); - private final Queue writeBehindTasks = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean isStarted = new AtomicBoolean(); + private final RQueue writeBehindTasks; private final CommandAsyncExecutor commandExecutor; private final MapOptions options; - public MapWriteBehindTask(CommandAsyncExecutor commandExecutor, MapOptions options) { + public MapWriteBehindTask(String name, CommandAsyncExecutor commandExecutor, MapOptions options) { super(); this.commandExecutor = commandExecutor; this.options = (MapOptions) options; + String queueName = RedissonObject.suffixName(name, "write-behind-queue"); + this.writeBehindTasks = new RedissonQueue<>(commandExecutor, queueName, null); } - private void enqueueTask() { - if (!isScheduled.compareAndSet(false, true)) { + public void start() { + if (!isStarted.compareAndSet(false, true)) { return; } - - commandExecutor.getConnectionManager().newTimeout(t -> { + + enqueueTask(); + } + + private void pollTask(Map addedMap, List deletedKeys) { + RFuture future = writeBehindTasks.pollAsync(); + future.onComplete((task, e) -> { + if (e != null) { + log.error(e.getMessage(), e); + return; + } + commandExecutor.getConnectionManager().getExecutor().execute(() -> { - Map addedMap = new LinkedHashMap<>(); - List deletedKeys = new ArrayList<>(); - try { - while (true) { - MapWriterTask task = writeBehindTasks.poll(); - if (task == null) { - break; - } - - if (task instanceof MapWriterTask.Remove) { - for (Object key : task.getKeys()) { - deletedKeys.add(key); - if (deletedKeys.size() == options.getWriteBehindBatchSize()) { - options.getWriter().delete(deletedKeys); - deletedKeys.clear(); - } + if (task != null) { + if (task instanceof MapWriterTask.Remove) { + for (Object key : task.getKeys()) { + deletedKeys.add(key); + if (deletedKeys.size() == options.getWriteBehindBatchSize()) { + options.getWriter().delete(deletedKeys); + deletedKeys.clear(); } - } else { - for (Entry entry : task.getMap().entrySet()) { - addedMap.put(entry.getKey(), entry.getValue()); - if (addedMap.size() == options.getWriteBehindBatchSize()) { - options.getWriter().write(addedMap); - addedMap.clear(); - } + } + } else { + for (Entry entry : task.getMap().entrySet()) { + addedMap.put(entry.getKey(), entry.getValue()); + if (addedMap.size() == options.getWriteBehindBatchSize()) { + options.getWriter().write(addedMap); + addedMap.clear(); } } } - + + pollTask(addedMap, deletedKeys); + } else { if (!deletedKeys.isEmpty()) { options.getWriter().delete(deletedKeys); deletedKeys.clear(); @@ -93,22 +98,30 @@ public class MapWriteBehindTask { options.getWriter().write(addedMap); addedMap.clear(); } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - isScheduled.set(false); - if (!writeBehindTasks.isEmpty()) { enqueueTask(); } }); + }); + } + + private void enqueueTask() { + if (!isStarted.get()) { + return; + } + + commandExecutor.getConnectionManager().newTimeout(t -> { + Map addedMap = new LinkedHashMap<>(); + List deletedKeys = new ArrayList<>(); + pollTask(addedMap, deletedKeys); }, options.getWriteBehindDelay(), TimeUnit.MILLISECONDS); } public void addTask(MapWriterTask task) { writeBehindTasks.add(task); + } - enqueueTask(); + public void stop() { + isStarted.set(false); } - } diff --git a/redisson/src/main/java/org/redisson/MapWriterTask.java b/redisson/src/main/java/org/redisson/MapWriterTask.java index 973324637..82646a41f 100644 --- a/redisson/src/main/java/org/redisson/MapWriterTask.java +++ b/redisson/src/main/java/org/redisson/MapWriterTask.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -24,10 +25,13 @@ import java.util.Map; * @author Nikita Koksharov * */ -public class MapWriterTask { +public class MapWriterTask implements Serializable { public static class Remove extends MapWriterTask { + public Remove() { + } + public Remove(Collection keys) { super(keys); } diff --git a/redisson/src/main/java/org/redisson/WriteBehindService.java b/redisson/src/main/java/org/redisson/WriteBehindService.java index c46fffe4d..f636f9b9d 100644 --- a/redisson/src/main/java/org/redisson/WriteBehindService.java +++ b/redisson/src/main/java/org/redisson/WriteBehindService.java @@ -41,16 +41,18 @@ public class WriteBehindService { return task; } - task = new MapWriteBehindTask(executor, options); + task = new MapWriteBehindTask(name, executor, options); MapWriteBehindTask prevTask = tasks.putIfAbsent(name, task); if (prevTask != null) { task = prevTask; } + task.start(); return task; } public void stop(String name) { - tasks.remove(name); + MapWriteBehindTask task = tasks.remove(name); + task.stop(); } }