Fixed - write-behind tasks aren't flushed after Redisson shutdown invocation. #3768

pull/3770/head
Nikita Koksharov 4 years ago
parent cfd65a932e
commit 23ed9d8f7d

@ -72,58 +72,64 @@ public class MapWriteBehindTask {
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
if (task != null) {
if (task instanceof MapWriterTask.Remove) {
for (Object key : task.getKeys()) {
try {
deletedKeys.add(key);
if (deletedKeys.size() == options.getWriteBehindBatchSize()) {
options.getWriter().delete(deletedKeys);
deletedKeys.clear();
}
} catch (Exception exception) {
log.error("Unable to delete keys: " + deletedKeys, exception);
}
}
} else {
for (Entry<Object, Object> entry : task.getMap().entrySet()) {
try {
addedMap.put(entry.getKey(), entry.getValue());
if (addedMap.size() == options.getWriteBehindBatchSize()) {
options.getWriter().write(addedMap);
addedMap.clear();
}
} catch (Exception exception) {
log.error("Unable to add keys: " + addedMap, exception);
}
}
}
processTask(addedMap, deletedKeys, task);
pollTask(addedMap, deletedKeys);
} else {
try {
if (!deletedKeys.isEmpty()) {
options.getWriter().delete(deletedKeys);
deletedKeys.clear();
}
} catch (Exception exception) {
log.error("Unable to delete keys: " + deletedKeys, exception);
}
try {
if (!addedMap.isEmpty()) {
options.getWriter().write(addedMap);
addedMap.clear();
}
} catch (Exception exception) {
log.error("Unable to add keys: " + addedMap, exception);
}
flushTasks(addedMap, deletedKeys);
enqueueTask();
}
});
});
}
private void flushTasks(Map<Object, Object> addedMap, List<Object> deletedKeys) {
try {
if (!deletedKeys.isEmpty()) {
options.getWriter().delete(deletedKeys);
deletedKeys.clear();
}
} catch (Exception exception) {
log.error("Unable to delete keys: " + deletedKeys, exception);
}
try {
if (!addedMap.isEmpty()) {
options.getWriter().write(addedMap);
addedMap.clear();
}
} catch (Exception exception) {
log.error("Unable to add keys: " + addedMap, exception);
}
}
private void processTask(Map<Object, Object> addedMap, List<Object> deletedKeys, MapWriterTask task) {
if (task instanceof MapWriterTask.Remove) {
for (Object key : task.getKeys()) {
try {
deletedKeys.add(key);
if (deletedKeys.size() == options.getWriteBehindBatchSize()) {
options.getWriter().delete(deletedKeys);
deletedKeys.clear();
}
} catch (Exception exception) {
log.error("Unable to delete keys: " + deletedKeys, exception);
}
}
} else {
for (Entry<Object, Object> entry : task.getMap().entrySet()) {
try {
addedMap.put(entry.getKey(), entry.getValue());
if (addedMap.size() == options.getWriteBehindBatchSize()) {
options.getWriter().write(addedMap);
addedMap.clear();
}
} catch (Exception exception) {
log.error("Unable to add keys: " + addedMap, exception);
}
}
}
}
private void enqueueTask() {
if (!isStarted.get()) {
return;
@ -142,5 +148,12 @@ public class MapWriteBehindTask {
public void stop() {
isStarted.set(false);
Map<Object, Object> addedMap = new LinkedHashMap<>();
List<Object> deletedKeys = new ArrayList<>();
for (MapWriterTask task : writeBehindTasks.readAll()) {
processTask(addedMap, deletedKeys, task);
}
flushTasks(addedMap, deletedKeys);
}
}

@ -514,6 +514,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public void shutdown() {
writeBehindService.stop();
connectionManager.shutdown();
}

@ -523,6 +523,7 @@ public class RedissonRx implements RedissonRxClient {
@Override
public void shutdown() {
writeBehindService.stop();
connectionManager.shutdown();
}

@ -49,7 +49,11 @@ public class WriteBehindService {
task.start();
return task;
}
public void stop() {
tasks.values().forEach(t -> t.stop());
}
public void stop(String name) {
MapWriteBehindTask task = tasks.remove(name);
task.stop();

Loading…
Cancel
Save