refactoring write-behind persist mode #927

pull/968/head
Nikita 8 years ago
parent 01cefc5641
commit 4c80f29b2a

@ -16,8 +16,7 @@
package org.redisson; package org.redisson;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -36,16 +35,19 @@ public class MapWriteBehindListener<R> implements FutureListener<R> {
private static final Logger log = LoggerFactory.getLogger(MapWriteBehindListener.class); private static final Logger log = LoggerFactory.getLogger(MapWriteBehindListener.class);
private static final AtomicBoolean sent = new AtomicBoolean(); private final AtomicInteger writeBehindCurrentThreads;
private static final Queue<Runnable> operations = new ConcurrentLinkedQueue<Runnable>(); private final Queue<Runnable> writeBehindTasks;
private final int threadsAmount;
private final MapWriterTask<R> task; private final MapWriterTask<R> task;
private final CommandAsyncExecutor commandExecutor; private final CommandAsyncExecutor commandExecutor;
public MapWriteBehindListener(CommandAsyncExecutor commandExecutor, MapWriterTask<R> task) { public MapWriteBehindListener(CommandAsyncExecutor commandExecutor, MapWriterTask<R> task, AtomicInteger writeBehindCurrentThreads, Queue<Runnable> writeBehindTasks, int threadsAmount) {
super(); super();
this.threadsAmount = threadsAmount;
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.task = task; this.task = task;
this.writeBehindCurrentThreads = writeBehindCurrentThreads;
this.writeBehindTasks = writeBehindTasks;
} }
@Override @Override
@ -66,16 +68,16 @@ public class MapWriteBehindListener<R> implements FutureListener<R> {
private void enqueueRunnable(Runnable runnable) { private void enqueueRunnable(Runnable runnable) {
if (runnable != null) { if (runnable != null) {
operations.add(runnable); writeBehindTasks.add(runnable);
} }
if (sent.compareAndSet(false, true)) { if (writeBehindCurrentThreads.incrementAndGet() <= threadsAmount) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
while (true) { while (true) {
Runnable runnable = operations.poll(); Runnable runnable = writeBehindTasks.poll();
if (runnable != null) { if (runnable != null) {
runnable.run(); runnable.run();
} else { } else {
@ -83,13 +85,14 @@ public class MapWriteBehindListener<R> implements FutureListener<R> {
} }
} }
} finally { } finally {
sent.set(false); if (writeBehindCurrentThreads.decrementAndGet() == 0 && !writeBehindTasks.isEmpty()) {
if (!operations.isEmpty()) {
enqueueRunnable(null); enqueueRunnable(null);
} }
} }
} }
}); });
} else {
writeBehindCurrentThreads.decrementAndGet();
} }
} }

@ -28,7 +28,9 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.MapOptions; import org.redisson.api.MapOptions;
@ -71,6 +73,8 @@ import io.netty.util.concurrent.FutureListener;
*/ */
public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> { public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
final AtomicInteger writeBehindCurrentThreads = new AtomicInteger();
final Queue<Runnable> writeBehindTasks;
final RedissonClient redisson; final RedissonClient redisson;
final MapOptions<K, V> options; final MapOptions<K, V> options;
@ -78,12 +82,22 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
super(commandExecutor, name); super(commandExecutor, name);
this.redisson = redisson; this.redisson = redisson;
this.options = options; this.options = options;
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
writeBehindTasks = new ConcurrentLinkedQueue<Runnable>();
} else {
writeBehindTasks = null;
}
} }
public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions<K, V> options) { public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions<K, V> options) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
this.redisson = redisson; this.redisson = redisson;
this.options = options; this.options = options;
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
writeBehindTasks = new ConcurrentLinkedQueue<Runnable>();
} else {
writeBehindTasks = null;
}
} }
@Override @Override
@ -274,7 +288,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
protected <M> RFuture<M> mapWriterFuture(RFuture<M> future, MapWriterTask<M> listener) { protected <M> RFuture<M> mapWriterFuture(RFuture<M> future, MapWriterTask<M> listener) {
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) { if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
future.addListener(new MapWriteBehindListener<M>(commandExecutor, listener)); future.addListener(new MapWriteBehindListener<M>(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads()));
return future; return future;
} }
@ -876,7 +890,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
options.getWriter().deleteAll(deletedKeys); options.getWriter().deleteAll(deletedKeys);
} }
}; };
future.addListener(new MapWriteBehindListener<List<Long>>(commandExecutor, listener)); future.addListener(new MapWriteBehindListener<List<Long>>(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads()));
} else { } else {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override @Override

@ -266,15 +266,23 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
} }
@Override @Override
public LocalCachedMapOptions<K, V> writer(MapWriter<K, V> writer, org.redisson.api.MapOptions.WriteMode writeMode) { public LocalCachedMapOptions<K, V> writer(MapWriter<K, V> writer) {
super.writer(writer, writeMode); return (LocalCachedMapOptions<K, V>) super.writer(writer);
return this; }
@Override
public LocalCachedMapOptions<K, V> writeBehindThreads(int writeBehindThreads) {
return (LocalCachedMapOptions<K, V>) super.writeBehindThreads(writeBehindThreads);
}
@Override
public LocalCachedMapOptions<K, V> writeMode(org.redisson.api.MapOptions.WriteMode writeMode) {
return (LocalCachedMapOptions<K, V>) super.writeMode(writeMode);
} }
@Override @Override
public LocalCachedMapOptions<K, V> loader(MapLoader<K, V> loader) { public LocalCachedMapOptions<K, V> loader(MapLoader<K, V> loader) {
super.loader(loader); return (LocalCachedMapOptions<K, V>) super.loader(loader);
return this;
} }
} }

@ -47,7 +47,8 @@ public class MapOptions<K, V> {
private MapLoader<K, V> loader; private MapLoader<K, V> loader;
private MapWriter<K, V> writer; private MapWriter<K, V> writer;
private WriteMode writeMode; private WriteMode writeMode = WriteMode.WRITE_THROUGH;
private int writeBehindThreads = 1;
protected MapOptions() { protected MapOptions() {
} }
@ -75,26 +76,53 @@ public class MapOptions<K, V> {
} }
/** /**
* Sets map writer object used for write-through operations. * Sets {@link MapWriter} object.
* *
* @param writer object * @param writer object
* @param writeMode for writer
* @return MapOptions instance * @return MapOptions instance
*/ */
public MapOptions<K, V> writer(MapWriter<K, V> writer, WriteMode writeMode) { public MapOptions<K, V> writer(MapWriter<K, V> writer) {
this.writer = writer; this.writer = writer;
this.writeMode = writeMode;
return this; return this;
} }
public MapWriter<K, V> getWriter() { public MapWriter<K, V> getWriter() {
return writer; return writer;
} }
/**
* Sets threads amount used in write behind mode.
* <p>
* Default is <code>1</code>
*
* @param writeBehindThreads - threads amount
* @return MapOptions instance
*/
public MapOptions<K, V> writeBehindThreads(int writeBehindThreads) {
this.writeBehindThreads = writeBehindThreads;
return this;
}
public int getWriteBehindThreads() {
return writeBehindThreads;
}
/**
* Sets write mode.
* <p>
* Default is <code>{@link WriteMode#WRITE_THROUGH}</code>
*
* @param writeMode - write mode
* @return MapOptions instance
*/
public MapOptions<K, V> writeMode(WriteMode writeMode) {
this.writeMode = writeMode;
return this;
}
public WriteMode getWriteMode() { public WriteMode getWriteMode() {
return writeMode; return writeMode;
} }
/** /**
* Sets map reader object used for write-through operations. * Sets {@link MapLoader} object.
* *
* @param loader object * @param loader object
* @return MapOptions instance * @return MapOptions instance

@ -18,7 +18,6 @@ import org.redisson.RedissonMapTest.SimpleValue;
import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy; import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RLocalCachedMap; import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.cache.Cache; import org.redisson.cache.Cache;
@ -51,7 +50,7 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
@Override @Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) { protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().writer(createMapWriter(map));
return redisson.getLocalCachedMap("test", options); return redisson.getLocalCachedMap("test", options);
} }

@ -19,7 +19,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.MapOptions; import org.redisson.api.MapOptions;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RMapCache; import org.redisson.api.RMapCache;
@ -141,7 +140,7 @@ public class RedissonMapCacheTest extends BaseMapTest {
@Override @Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) { protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map));
return redisson.getMapCache("test", options); return redisson.getMapCache("test", options);
} }

@ -19,7 +19,6 @@ import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.MapOptions; import org.redisson.api.MapOptions;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
@ -139,7 +138,7 @@ public class RedissonMapTest extends BaseMapTest {
@Override @Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) { protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH); MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map));
return redisson.getMap("test", options); return redisson.getMap("test", options);
} }

Loading…
Cancel
Save