MapWriter (write-behind) support for RMap added. #927

pull/968/head
Nikita 8 years ago
parent eae170f3a7
commit c3ef66b23d

@ -0,0 +1,97 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
* @param <R> return type
*/
public class MapWriteBehindListener<R> implements FutureListener<R> {
private static final Logger log = LoggerFactory.getLogger(MapWriteBehindListener.class);
private static final AtomicBoolean sent = new AtomicBoolean();
private static final Queue<Runnable> operations = new ConcurrentLinkedQueue<Runnable>();
private final MapWriterTask<R> task;
private final CommandAsyncExecutor commandExecutor;
public MapWriteBehindListener(CommandAsyncExecutor commandExecutor, MapWriterTask<R> task) {
super();
this.commandExecutor = commandExecutor;
this.task = task;
}
@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isSuccess() && task.condition(future)) {
enqueueRunnable(new Runnable() {
@Override
public void run() {
try {
task.execute();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
}
}
private void enqueueRunnable(Runnable runnable) {
if (runnable != null) {
operations.add(runnable);
}
if (sent.compareAndSet(false, true)) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
while (true) {
Runnable runnable = operations.poll();
if (runnable != null) {
runnable.run();
} else {
break;
}
}
} finally {
sent.set(false);
if (!operations.isEmpty()) {
enqueueRunnable(null);
}
}
}
});
}
}
}

@ -1,63 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.concurrent.ExecutorService;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
*
* @param <R> result type
*/
public abstract class MapWriterExecutorPromise<R> extends MapWriterPromise<R> {
public MapWriterExecutorPromise(RFuture<R> f, CommandAsyncExecutor commandExecutor) {
super(f, commandExecutor);
}
@Override
public void execute(final Future<R> future, ExecutorService executorService) {
if (condition(future)) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
executeWriter();
} catch (Exception e) {
tryFailure(e);
return;
}
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
protected boolean condition(Future<R> future) {
return true;
}
protected abstract void executeWriter();
}

@ -15,8 +15,6 @@
*/
package org.redisson;
import java.util.concurrent.ExecutorService;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
@ -30,23 +28,35 @@ import io.netty.util.concurrent.FutureListener;
*
* @param <R> result type
*/
public abstract class MapWriterPromise<R> extends RedissonPromise<R> {
public class MapWriterPromise<R> extends RedissonPromise<R> {
public MapWriterPromise(RFuture<R> f, final CommandAsyncExecutor commandExecutor) {
super();
public MapWriterPromise(RFuture<R> f, final CommandAsyncExecutor commandExecutor, final MapWriterTask<R> task) {
f.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
public void operationComplete(final Future<R> future) throws Exception {
if (!future.isSuccess()) {
tryFailure(future.cause());
return;
}
execute(future, commandExecutor.getConnectionManager().getExecutor());
if (task.condition(future)) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
task.execute();
} catch (Exception e) {
tryFailure(e);
return;
}
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
});
}
public abstract void execute(Future<R> future, ExecutorService executorService);
}

@ -0,0 +1,34 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
*
* @param <R> return type
*/
public abstract class MapWriterTask<R> {
protected boolean condition(Future<R> future) {
return true;
}
protected abstract void execute();
}

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.ClusterNodesGroup;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.MapOptions;
import org.redisson.api.Node;
import org.redisson.api.NodesGroup;
import org.redisson.api.RAtomicDouble;
@ -265,12 +266,12 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, null);
}
@Override
public <K, V> RMap<K, V> getMap(String name, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, mapLoader, mapWriter);
public <K, V> RMap<K, V> getMap(String name, MapOptions<K, V> options) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, options);
}
@Override
@ -315,32 +316,32 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, null);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, mapLoader, mapWriter);
public <K, V> RMapCache<K, V> getMapCache(String name, MapOptions<K, V> options) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, options);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, null);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, mapLoader, mapWriter);
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, options);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, null, null);
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, null);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, mapLoader, mapWriter);
public <K, V> RMap<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, options);
}
@Override

@ -102,12 +102,12 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMapAsync<K, V> getMap(String name) {
return new RedissonMap<K, V>(executorService, name, null, null, null);
return new RedissonMap<K, V>(executorService, name, null, null);
}
@Override
public <K, V> RMapAsync<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, executorService, name, null, null, null);
return new RedissonMap<K, V>(codec, executorService, name, null, null);
}
@Override
@ -202,12 +202,12 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, executorService, name, null, null, null);
return new RedissonMapCache<K, V>(codec, evictionScheduler, executorService, name, null, null);
}
@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(evictionScheduler, executorService, name, null, null, null);
return new RedissonMapCache<K, V>(evictionScheduler, executorService, name, null, null);
}
@Override

@ -206,12 +206,12 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private volatile long lastInvalidate;
protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(commandExecutor, name, redisson, options.getMapLoader(), options.getMapWriter());
super(commandExecutor, name, redisson, options);
init(name, options, redisson, evictionScheduler);
}
protected RedissonLocalCachedMap(Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(codec, connectionManager, name, redisson, options.getMapLoader(), options.getMapWriter());
super(codec, connectionManager, name, redisson, options);
init(name, options, redisson, evictionScheduler);
}

@ -31,13 +31,13 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.MapOptions;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
@ -72,21 +72,18 @@ import io.netty.util.concurrent.FutureListener;
public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
final RedissonClient redisson;
MapLoader<K, V> mapLoader;
MapWriter<K, V> mapWriter;
final MapOptions<K, V> options;
protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions<K, V> options) {
super(commandExecutor, name);
this.redisson = redisson;
this.mapLoader = mapLoader;
this.mapWriter = mapWriter;
this.options = options;
}
public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions<K, V> options) {
super(codec, commandExecutor, name);
this.redisson = redisson;
this.mapLoader = mapLoader;
this.mapWriter = mapWriter;
this.options = options;
}
@Override
@ -192,7 +189,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
RFuture<Map<K, V>> future = getAllValuesAsync(keys);
if (mapLoader == null) {
if (hasNoLoader()) {
return future;
}
@ -223,6 +220,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return result;
}
protected boolean hasNoLoader() {
return options == null || options.getLoader() == null;
}
protected RFuture<Map<K, V>> getAllValuesAsync(final Set<K> keys) {
List<Object> args = new ArrayList<Object>(keys.size() + 1);
args.add(getName());
@ -258,17 +259,26 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
RFuture<Void> future = putAllOperationAsync(map);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<Void> result = new MapWriterExecutorPromise<Void>(future, commandExecutor) {
MapWriterTask<Void> listener = new MapWriterTask<Void>() {
@Override
public void executeWriter() {
mapWriter.writeAll((Map<K, V>) map);
public void execute() {
options.getWriter().writeAll((Map<K, V>) map);
}
};
return result;
return mapWriterFuture(future, listener);
}
protected <M> RFuture<M> mapWriterFuture(RFuture<M> future, MapWriterTask<M> listener) {
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
future.addListener(new MapWriteBehindListener<M>(commandExecutor, listener));
return future;
}
return new MapWriterPromise<M>(future, commandExecutor, listener);
}
protected RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> map) {
@ -362,22 +372,27 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkValue(key);
RFuture<V> future = putIfAbsentOperationAsync(key, value);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<V> result = new MapWriterExecutorPromise<V>(future, commandExecutor) {
MapWriterTask<V> listener = new MapWriterTask<V>() {
@Override
protected void executeWriter() {
mapWriter.write(key, value);
public void execute() {
options.getWriter().write(key, value);
}
@Override
protected boolean condition(Future<V> future) {
return future.getNow() == null;
}
};
return result;
return mapWriterFuture(future, listener);
}
protected boolean hasNoWriter() {
return options == null || options.getWriter() == null;
}
protected RFuture<V> putIfAbsentOperationAsync(K key, V value) {
@ -401,23 +416,23 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkValue(value);
RFuture<Boolean> future = fastPutIfAbsentOperationAsync(key, value);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<Boolean> result = new MapWriterExecutorPromise<Boolean>(future, commandExecutor) {
MapWriterTask<Boolean> listener = new MapWriterTask<Boolean>() {
@Override
protected void executeWriter() {
mapWriter.write(key, value);
public void execute() {
options.getWriter().write(key, value);
}
@Override
protected boolean condition(Future<Boolean> future) {
return future.getNow();
}
};
return result;
};
return mapWriterFuture(future, listener);
}
protected RFuture<Boolean> fastPutIfAbsentOperationAsync(K key, V value) {
@ -435,23 +450,23 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkValue(value);
RFuture<Boolean> future = removeOperationAsync(key, value);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<Boolean> result = new MapWriterExecutorPromise<Boolean>(future, commandExecutor) {
MapWriterTask<Boolean> listener = new MapWriterTask<Boolean>() {
@Override
protected void executeWriter() {
mapWriter.delete((K) key);
public void execute() {
options.getWriter().delete((K) key);
}
@Override
protected boolean condition(Future<Boolean> future) {
return future.getNow();
}
};
return result;
};
return mapWriterFuture(future, listener);
}
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
@ -487,23 +502,23 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
RFuture<Boolean> future = replaceOperationAsync(key, oldValue, newValue);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<Boolean> result = new MapWriterExecutorPromise<Boolean>(future, commandExecutor) {
MapWriterTask<Boolean> listener = new MapWriterTask<Boolean>() {
@Override
protected void executeWriter() {
mapWriter.write(key, newValue);
public void execute() {
options.getWriter().write(key, newValue);
}
@Override
protected boolean condition(Future<Boolean> future) {
return future.getNow();
}
};
return result;
};
return mapWriterFuture(future, listener);
}
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
@ -528,23 +543,23 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkValue(value);
RFuture<V> future = replaceOperationAsync(key, value);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<V> result = new MapWriterExecutorPromise<V>(future, commandExecutor) {
MapWriterTask<V> listener = new MapWriterTask<V>() {
@Override
protected void executeWriter() {
mapWriter.write(key, value);
public void execute() {
options.getWriter().write(key, value);
}
@Override
protected boolean condition(Future<V> future) {
return future.getNow() != null;
}
};
return result;
};
return mapWriterFuture(future, listener);
}
protected RFuture<V> replaceOperationAsync(final K key, final V value) {
@ -568,7 +583,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkKey(key);
RFuture<V> future = getOperationAsync(key);
if (mapLoader == null) {
if (hasNoLoader()) {
return future;
}
@ -602,7 +617,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Void> loadAllAsync(boolean replaceExistingValues, int parallelism) {
return loadAllAsync(mapLoader.loadAllKeys(), replaceExistingValues, parallelism, null);
return loadAllAsync(options.getLoader().loadAllKeys(), replaceExistingValues, parallelism, null);
}
@Override
@ -740,18 +755,17 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkValue(value);
RFuture<V> future = putOperationAsync(key, value);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<V> result = new MapWriterExecutorPromise<V>(future, commandExecutor) {
MapWriterTask<V> listener = new MapWriterTask<V>() {
@Override
public void executeWriter() {
mapWriter.write(key, value);
public void execute() {
options.getWriter().write(key, value);
}
};
return result;
return mapWriterFuture(future, listener);
}
protected RFuture<V> putOperationAsync(K key, V value) {
@ -768,18 +782,17 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkKey(key);
RFuture<V> future = removeOperationAsync(key);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<V> result = new MapWriterExecutorPromise<V>(future, commandExecutor) {
MapWriterTask<V> listener = new MapWriterTask<V>() {
@Override
public void executeWriter() {
mapWriter.delete(key);
public void execute() {
options.getWriter().delete(key);
}
};
return result;
return mapWriterFuture(future, listener);
}
protected RFuture<V> removeOperationAsync(K key) {
@ -796,18 +809,17 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkValue(value);
RFuture<Boolean> future = fastPutOperationAsync(key, value);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<Boolean> result = new MapWriterExecutorPromise<Boolean>(future, commandExecutor) {
MapWriterTask<Boolean> listener = new MapWriterTask<Boolean>() {
@Override
public void executeWriter() {
mapWriter.write(key, value);
public void execute() {
options.getWriter().write(key, value);
}
};
return result;
return mapWriterFuture(future, listener);
}
protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
@ -829,12 +841,11 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return newSucceededFuture(0L);
}
if (mapWriter == null) {
if (hasNoWriter()) {
return fastRemoveOperationAsync(keys);
}
RFuture<List<Long>> future = fastRemoveOperationBatchAsync(keys);
RFuture<List<Long>> future = fastRemoveOperationBatchAsync(keys);
final RPromise<Long> result = new RedissonPromise<Long>();
future.addListener(new FutureListener<List<Long>>() {
@Override
@ -844,19 +855,37 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return;
}
if (future.getNow().isEmpty()) {
result.trySuccess(0L);
return;
}
final List<K> deletedKeys = new ArrayList<K>();
for (int i = 0; i < future.getNow().size(); i++) {
if (future.getNow().get(i) == 1) {
deletedKeys.add(keys[i]);
}
}
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
mapWriter.deleteAll(deletedKeys);
result.trySuccess((long)deletedKeys.size());
}
});
if (options.getWriteMode() == WriteMode.WRITE_BEHIND) {
result.trySuccess((long)deletedKeys.size());
MapWriterTask<List<Long>> listener = new MapWriterTask<List<Long>>() {
@Override
public void execute() {
options.getWriter().deleteAll(deletedKeys);
}
};
future.addListener(new MapWriteBehindListener<List<Long>>(commandExecutor, listener));
} else {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
options.getWriter().deleteAll(deletedKeys);
result.trySuccess((long)deletedKeys.size());
}
});
}
}
});
return result;
@ -911,18 +940,18 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
checkValue(value);
final RFuture<V> future = addAndGetOperationAsync(key, value);
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<V> result = new MapWriterExecutorPromise<V>(future, commandExecutor) {
MapWriterTask<V> listener = new MapWriterTask<V>() {
@Override
public void executeWriter() {
mapWriter.write(key, future.getNow());
public void execute() {
options.getWriter().write(key, future.getNow());
}
};
return result;
return mapWriterFuture(future, listener);
}
protected RFuture<V> addAndGetOperationAsync(K key, Number value) {
@ -1096,7 +1125,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
final V value = mapLoader.load(key);
final V value = options.getLoader().load(key);
if (value == null) {
result.trySuccess(value);
return;

@ -23,16 +23,14 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.redisson.api.MapOptions;
import org.redisson.api.RFuture;
import org.redisson.api.RMapCache;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.event.EntryCreatedListener;
import org.redisson.api.map.event.EntryEvent;
import org.redisson.api.map.event.EntryExpiredListener;
@ -59,7 +57,6 @@ import org.redisson.codec.MapCacheEventCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -86,14 +83,14 @@ import io.netty.util.concurrent.FutureListener;
public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> {
public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
String name, RedissonClient redisson, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
super(commandExecutor, name, redisson, mapLoader, mapWriter);
String name, RedissonClient redisson, MapOptions<K, V> options) {
super(commandExecutor, name, redisson, options);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName());
}
public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
String name, RedissonClient redisson, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
super(codec, commandExecutor, name, redisson, mapLoader, mapWriter);
String name, RedissonClient redisson, MapOptions<K, V> options) {
super(codec, commandExecutor, name, redisson, options);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName());
}
@ -316,14 +313,14 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<V> result = new MapWriterExecutorPromise<V>(future, commandExecutor) {
MapWriterTask<V> listener = new MapWriterTask<V>() {
@Override
protected void executeWriter() {
mapWriter.write(key, value);
protected void execute() {
options.getWriter().write(key, value);
}
@Override
@ -331,9 +328,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return future.getNow() == null;
}
};
return result;
return mapWriterFuture(future, listener);
}
@Override
@ -584,17 +579,17 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<Boolean> result = new MapWriterExecutorPromise<Boolean>(future, commandExecutor) {
MapWriterTask<Boolean> listener = new MapWriterTask<Boolean>() {
@Override
public void executeWriter() {
mapWriter.write(key, value);
protected void execute() {
options.getWriter().write(key, value);
}
};
return result;
return mapWriterFuture(future, listener);
}
@Override
@ -693,19 +688,17 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return val",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<V> result = new MapWriterExecutorPromise<V>(future, commandExecutor) {
MapWriterTask<V> listener = new MapWriterTask<V>() {
@Override
public void executeWriter() {
mapWriter.write(key, value);
protected void execute() {
options.getWriter().write(key, value);
}
};
return result;
return mapWriterFuture(future, listener);
}
String getTimeoutSetNameByKey(Object key) {
@ -1122,23 +1115,21 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
if (mapWriter == null) {
if (hasNoWriter()) {
return future;
}
RPromise<Boolean> result = new MapWriterExecutorPromise<Boolean>(future, commandExecutor) {
MapWriterTask<Boolean> listener = new MapWriterTask<Boolean>() {
@Override
protected void executeWriter() {
mapWriter.write(key, value);
protected void execute() {
options.getWriter().write(key, value);
}
@Override
protected boolean condition(Future<Boolean> future) {
return future.getNow();
}
};
return result;
return mapWriterFuture(future, listener);
}
@Override

@ -28,7 +28,7 @@ import org.redisson.api.map.MapWriter;
* @param <K> key type
* @param <V> value type
*/
public class LocalCachedMapOptions<K, V> {
public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
public enum InvalidationPolicy {
@ -264,33 +264,17 @@ public class LocalCachedMapOptions<K, V> {
public LocalCachedMapOptions<K, V> maxIdle(long maxIdle, TimeUnit timeUnit) {
return maxIdle(timeUnit.toMillis(maxIdle));
}
/**
* Sets map writer object used for write-through operations.
*
* @param writer object
* @return LocalCachedMapOptions instance
*/
public LocalCachedMapOptions<K, V> mapWriter(MapWriter<K, V> writer) {
this.mapWriter = writer;
@Override
public LocalCachedMapOptions<K, V> writer(MapWriter<K, V> writer, org.redisson.api.MapOptions.WriteMode writeMode) {
super.writer(writer, writeMode);
return this;
}
public MapWriter<K, V> getMapWriter() {
return mapWriter;
}
/**
* Sets map reader object used for write-through operations.
*
* @param loader object
* @return LocalCachedMapOptions instance
*/
public LocalCachedMapOptions<K, V> mapLoader(MapLoader<K, V> loader) {
this.mapLoader = loader;
@Override
public LocalCachedMapOptions<K, V> loader(MapLoader<K, V> loader) {
super.loader(loader);
return this;
}
public MapLoader<K, V> getMapLoader() {
return mapLoader;
}
}

@ -0,0 +1,110 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
/**
* RMap options object.
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public class MapOptions<K, V> {
public enum WriteMode {
/**
* In write behind mode all data written in map object
* also written using MapWriter in asynchronous mode.
*/
WRITE_BEHIND,
/**
* In write through mode all write operations for map object
* are synchronized with MapWriter write operations.
* If MapWriter throws an error then it will be re-thrown to Map operation caller.
*/
WRITE_THROUGH
}
private MapLoader<K, V> loader;
private MapWriter<K, V> writer;
private WriteMode writeMode;
protected MapOptions() {
}
protected MapOptions(MapOptions<K, V> copy) {
}
/**
* Creates a new instance of MapOptions with default options.
* <p>
* This is equivalent to:
* <pre>
* new MapOptions()
* .writer(null, null).loader(null);
* </pre>
*
* @param <K> key type
* @param <V> value type
*
* @return MapOptions instance
*
*/
public static <K, V> MapOptions<K, V> defaults() {
return new MapOptions<K, V>();
}
/**
* Sets map writer object used for write-through operations.
*
* @param writer object
* @param writeMode for writer
* @return MapOptions instance
*/
public MapOptions<K, V> writer(MapWriter<K, V> writer, WriteMode writeMode) {
this.writer = writer;
this.writeMode = writeMode;
return this;
}
public MapWriter<K, V> getWriter() {
return writer;
}
public WriteMode getWriteMode() {
return writeMode;
}
/**
* Sets map reader object used for write-through operations.
*
* @param loader object
* @return MapOptions instance
*/
public MapOptions<K, V> loader(MapLoader<K, V> loader) {
this.loader = loader;
return this;
}
public MapLoader<K, V> getLoader() {
return loader;
}
}

@ -17,11 +17,9 @@ package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.redisson.codec.CodecProvider;
import org.redisson.config.Config;
import org.redisson.liveobject.provider.ResolverProvider;
/**
@ -89,25 +87,39 @@ public interface RedissonClient {
/**
* Returns map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
* Supports entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.</p>
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param name - object name
* @param codec - codec for keys and values
* @return MapCache object
*/
<K, V> RMapCache<K, V> getMapCache(String name, Codec codec);
<K, V> RMapCache<K, V> getMapCache(String name, Codec codec, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter);
/**
* Returns map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.
*
* @param <K> type of key
* @param <V> type of value
* @param name - object name
* @param codec - codec for keys and values
* @param options - map options
* @return MapCache object
*/
<K, V> RMapCache<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options);
/**
* Returns map-based cache instance by name.
* Supports entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getMap(String)}.</p>
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String)}.</p>
*
* @param <K> type of key
* @param <V> type of value
@ -116,7 +128,19 @@ public interface RedissonClient {
*/
<K, V> RMapCache<K, V> getMapCache(String name);
<K, V> RMapCache<K, V> getMapCache(String name, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter);
/**
* Returns map-based cache instance by name.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String)}.</p>
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param options - map options
* @return MapCache object
*/
<K, V> RMapCache<K, V> getMapCache(String name, MapOptions<K, V> options);
/**
* Returns object holder instance by name.
@ -278,8 +302,17 @@ public interface RedissonClient {
* @return Map object
*/
<K, V> RMap<K, V> getMap(String name);
<K, V> RMap<K, V> getMap(String name, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter);
/**
* Returns map instance by name.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param options - map options
* @return Map object
*/
<K, V> RMap<K, V> getMap(String name, MapOptions<K, V> options);
/**
* Returns map instance by name
@ -292,8 +325,19 @@ public interface RedissonClient {
* @return Map object
*/
<K, V> RMap<K, V> getMap(String name, Codec codec);
<K, V> RMap<K, V> getMap(String name, Codec codec, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter);
/**
* Returns map instance by name
* using provided codec for both map keys and values.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param codec - codec for keys and values
* @param options - map options
* @return Map object
*/
<K, V> RMap<K, V> getMap(String name, Codec codec, MapOptions<K, V> options);
/**
* Returns Set based Multimap instance by name.

@ -18,6 +18,7 @@ import org.redisson.RedissonMapTest.SimpleValue;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.cache.Cache;
@ -50,13 +51,13 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
@Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().mapWriter(createMapWriter(map));
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH);
return redisson.getLocalCachedMap("test", options);
}
@Override
protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().mapLoader(createMapLoader(map));
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().loader(createMapLoader(map));
return redisson.getLocalCachedMap("test", options);
}

@ -18,6 +18,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.MapOptions;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
@ -139,12 +141,14 @@ public class RedissonMapCacheTest extends BaseMapTest {
@Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
return redisson.getMapCache("test", null, createMapWriter(map));
MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH);
return redisson.getMapCache("test", options);
}
@Override
protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {
return redisson.getMapCache("test", createMapLoader(map), null);
MapOptions<K, V> options = MapOptions.<K, V>defaults().loader(createMapLoader(map));
return redisson.getMapCache("test", options);
}
@Test

@ -18,6 +18,8 @@ import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.api.MapOptions;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
@ -131,12 +133,14 @@ public class RedissonMapTest extends BaseMapTest {
@Override
protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {
return redisson.getMap("test", createMapLoader(map), null);
MapOptions<K, V> options = MapOptions.<K, V>defaults().loader(createMapLoader(map));
return redisson.getMap("test", options);
}
@Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
return redisson.getMap("test", null, createMapWriter(map));
MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map), WriteMode.WRITE_THROUGH);
return redisson.getMap("test", options);
}
@Test

Loading…
Cancel
Save