MapReduce implementation. #312

pull/831/head
Nikita 8 years ago
parent 5edf70a450
commit 5d83a86d33

@ -193,12 +193,12 @@ public class Redisson implements RedissonClient {
@Override
public <V> RGeo<V> getGeo(String name) {
return new RedissonGeo<V>(connectionManager.getCommandExecutor(), name);
return new RedissonGeo<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RGeo<V> getGeo(String name, Codec codec) {
return new RedissonGeo<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonGeo<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
@ -233,12 +233,12 @@ public class Redisson implements RedissonClient {
@Override
public <V> RList<V> getList(String name) {
return new RedissonList<V>(connectionManager.getCommandExecutor(), name);
return new RedissonList<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RList<V> getList(String name, Codec codec) {
return new RedissonList<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonList<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
@ -253,17 +253,17 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(id, connectionManager.getCommandExecutor(), name, options);
return new RedissonLocalCachedMap<K, V>(id, connectionManager.getCommandExecutor(), name, options, this);
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(id, codec, connectionManager.getCommandExecutor(), name, options);
return new RedissonLocalCachedMap<K, V>(id, codec, connectionManager.getCommandExecutor(), name, options, this);
}
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(id, connectionManager.getCommandExecutor(), name);
return new RedissonMap<K, V>(id, connectionManager.getCommandExecutor(), name, this);
}
@Override
@ -298,27 +298,27 @@ public class Redisson implements RedissonClient {
@Override
public <V> RSetCache<V> getSetCache(String name) {
return new RedissonSetCache<V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonSetCache<V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RSetCache<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonSetCache<V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(id, evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonMapCache<K, V>(id, evictionScheduler, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(id, codec, evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonMapCache<K, V>(id, codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(id, codec, connectionManager.getCommandExecutor(), name);
return new RedissonMap<K, V>(id, codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
@ -338,12 +338,12 @@ public class Redisson implements RedissonClient {
@Override
public <V> RSet<V> getSet(String name) {
return new RedissonSet<V>(connectionManager.getCommandExecutor(), name);
return new RedissonSet<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RSet<V> getSet(String name, Codec codec) {
return new RedissonSet<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonSet<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
@ -399,17 +399,17 @@ public class Redisson implements RedissonClient {
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(connectionManager.getCommandExecutor(), name);
return new RedissonScoredSortedSet<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSet<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonScoredSortedSet<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public RLexSortedSet getLexSortedSet(String name) {
return new RedissonLexSortedSet(connectionManager.getCommandExecutor(), name);
return new RedissonLexSortedSet(connectionManager.getCommandExecutor(), name, this);
}
@Override
@ -434,12 +434,12 @@ public class Redisson implements RedissonClient {
@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name) {
return new RedissonBlockingFairQueue<V>(connectionManager.getCommandExecutor(), name, semaphorePubSub, id);
return new RedissonBlockingFairQueue<V>(connectionManager.getCommandExecutor(), name, semaphorePubSub, id, this);
}
@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name, Codec codec) {
return new RedissonBlockingFairQueue<V>(codec, connectionManager.getCommandExecutor(), name, semaphorePubSub, id);
return new RedissonBlockingFairQueue<V>(codec, connectionManager.getCommandExecutor(), name, semaphorePubSub, id, this);
}
@Override
@ -452,52 +452,52 @@ public class Redisson implements RedissonClient {
@Override
public <V> RQueue<V> getQueue(String name) {
return new RedissonQueue<V>(connectionManager.getCommandExecutor(), name);
return new RedissonQueue<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RQueue<V> getQueue(String name, Codec codec) {
return new RedissonQueue<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<V>(connectionManager.getCommandExecutor(), name);
return new RedissonBlockingQueue<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RBoundedBlockingQueue<V> getBoundedBlockingQueue(String name) {
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, connectionManager.getCommandExecutor(), name);
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RBoundedBlockingQueue<V> getBoundedBlockingQueue(String name, Codec codec) {
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, codec, connectionManager.getCommandExecutor(), name);
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RDeque<V> getDeque(String name) {
return new RedissonDeque<V>(connectionManager.getCommandExecutor(), name);
return new RedissonDeque<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RDeque<V> getDeque(String name, Codec codec) {
return new RedissonDeque<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name) {
return new RedissonBlockingDeque<V>(connectionManager.getCommandExecutor(), name);
return new RedissonBlockingDeque<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name, Codec codec) {
return new RedissonBlockingDeque<V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonBlockingDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
};
@Override

@ -42,7 +42,6 @@ import org.redisson.api.RScriptAsync;
import org.redisson.api.RSetAsync;
import org.redisson.api.RSetCacheAsync;
import org.redisson.api.RTopicAsync;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
@ -88,32 +87,32 @@ public class RedissonBatch implements RBatch {
@Override
public <V> RListAsync<V> getList(String name) {
return new RedissonList<V>(executorService, name);
return new RedissonList<V>(executorService, name, null);
}
@Override
public <V> RListAsync<V> getList(String name, Codec codec) {
return new RedissonList<V>(codec, executorService, name);
return new RedissonList<V>(codec, executorService, name, null);
}
@Override
public <K, V> RMapAsync<K, V> getMap(String name) {
return new RedissonMap<K, V>(id, executorService, name);
return new RedissonMap<K, V>(id, executorService, name, null);
}
@Override
public <K, V> RMapAsync<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(id, codec, executorService, name);
return new RedissonMap<K, V>(id, codec, executorService, name, null);
}
@Override
public <V> RSetAsync<V> getSet(String name) {
return new RedissonSet<V>(executorService, name);
return new RedissonSet<V>(executorService, name, null);
}
@Override
public <V> RSetAsync<V> getSet(String name, Codec codec) {
return new RedissonSet<V>(codec, executorService, name);
return new RedissonSet<V>(codec, executorService, name, null);
}
@Override
@ -128,42 +127,42 @@ public class RedissonBatch implements RBatch {
@Override
public <V> RQueueAsync<V> getQueue(String name) {
return new RedissonQueue<V>(executorService, name);
return new RedissonQueue<V>(executorService, name, null);
}
@Override
public <V> RQueueAsync<V> getQueue(String name, Codec codec) {
return new RedissonQueue<V>(codec, executorService, name);
return new RedissonQueue<V>(codec, executorService, name, null);
}
@Override
public <V> RBlockingQueueAsync<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<V>(executorService, name);
return new RedissonBlockingQueue<V>(executorService, name, null);
}
@Override
public <V> RBlockingQueueAsync<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, executorService, name);
return new RedissonBlockingQueue<V>(codec, executorService, name, null);
}
@Override
public <V> RBlockingDequeAsync<V> getBlockingDeque(String name) {
return new RedissonBlockingDeque<V>(executorService, name);
return new RedissonBlockingDeque<V>(executorService, name, null);
}
@Override
public <V> RBlockingDequeAsync<V> getBlockingDeque(String name, Codec codec) {
return new RedissonBlockingDeque<V>(codec, executorService, name);
return new RedissonBlockingDeque<V>(codec, executorService, name, null);
}
@Override
public <V> RDequeAsync<V> getDeque(String name) {
return new RedissonDeque<V>(executorService, name);
return new RedissonDeque<V>(executorService, name, null);
}
@Override
public <V> RDequeAsync<V> getDeque(String name, Codec codec) {
return new RedissonDeque<V>(codec, executorService, name);
return new RedissonDeque<V>(codec, executorService, name, null);
}
@Override
@ -178,17 +177,17 @@ public class RedissonBatch implements RBatch {
@Override
public <V> RScoredSortedSetAsync<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(executorService, name);
return new RedissonScoredSortedSet<V>(executorService, name, null);
}
@Override
public <V> RScoredSortedSetAsync<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSet<V>(codec, executorService, name);
return new RedissonScoredSortedSet<V>(codec, executorService, name, null);
}
@Override
public RLexSortedSetAsync getLexSortedSet(String name) {
return new RedissonLexSortedSet(executorService, name);
return new RedissonLexSortedSet(executorService, name, null);
}
@Override
@ -198,12 +197,12 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(id, codec, evictionScheduler, executorService, name);
return new RedissonMapCache<K, V>(id, codec, evictionScheduler, executorService, name, null);
}
@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(id, evictionScheduler, executorService, name);
return new RedissonMapCache<K, V>(id, evictionScheduler, executorService, name, null);
}
@Override
@ -218,12 +217,12 @@ public class RedissonBatch implements RBatch {
@Override
public <V> RSetCacheAsync<V> getSetCache(String name) {
return new RedissonSetCache<V>(evictionScheduler, executorService, name);
return new RedissonSetCache<V>(evictionScheduler, executorService, name, null);
}
@Override
public <V> RSetCacheAsync<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, evictionScheduler, executorService, name);
return new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
}
@Override
@ -268,12 +267,12 @@ public class RedissonBatch implements RBatch {
@Override
public <V> RGeoAsync<V> getGeo(String name) {
return new RedissonGeo<V>(executorService, name);
return new RedissonGeo<V>(executorService, name, null);
}
@Override
public <V> RGeoAsync<V> getGeo(String name, Codec codec) {
return new RedissonGeo<V>(codec, executorService, name);
return new RedissonGeo<V>(codec, executorService, name, null);
}
@Override

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
@ -38,14 +39,14 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
private final RedissonBlockingQueue<V> blockingQueue;
protected RedissonBlockingDeque(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
blockingQueue = new RedissonBlockingQueue<V>(commandExecutor, name);
protected RedissonBlockingDeque(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
blockingQueue = new RedissonBlockingQueue<V>(commandExecutor, name, redisson);
}
protected RedissonBlockingDeque(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
blockingQueue = new RedissonBlockingQueue<V>(codec, commandExecutor, name);
protected RedissonBlockingDeque(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
blockingQueue = new RedissonBlockingQueue<V>(codec, commandExecutor, name, redisson);
}
@Override

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
@ -51,15 +52,15 @@ public class RedissonBlockingFairQueue<V> extends RedissonBlockingQueue<V> imple
private final AtomicInteger instances = new AtomicInteger();
private final SemaphorePubSub semaphorePubSub;
protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) {
super(commandExecutor, name);
protected RedissonBlockingFairQueue(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id, RedissonClient redisson) {
super(commandExecutor, name, redisson);
this.semaphorePubSub = semaphorePubSub;
this.id = id;
instances.incrementAndGet();
}
protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id) {
super(codec, commandExecutor, name);
protected RedissonBlockingFairQueue(Codec codec, CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub, UUID id, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
this.semaphorePubSub = semaphorePubSub;
this.id = id;
instances.incrementAndGet();

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -40,12 +41,12 @@ import org.redisson.connection.decoder.ListDrainToDecoder;
*/
public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {
protected RedissonBlockingQueue(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
protected RedissonBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
}
protected RedissonBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
protected RedissonBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
}
@Override

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RBoundedBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
@ -47,14 +48,14 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
private final CommandExecutor commandExecutor;
private final SemaphorePubSub semaphorePubSub;
protected RedissonBoundedBlockingQueue(SemaphorePubSub semaphorePubSub, CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
protected RedissonBoundedBlockingQueue(SemaphorePubSub semaphorePubSub, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
this.semaphorePubSub = semaphorePubSub;
this.commandExecutor = commandExecutor;
}
protected RedissonBoundedBlockingQueue(SemaphorePubSub semaphorePubSub, Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
protected RedissonBoundedBlockingQueue(SemaphorePubSub semaphorePubSub, Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
this.semaphorePubSub = semaphorePubSub;
this.commandExecutor = commandExecutor;
}

@ -492,11 +492,6 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
return get(pollLastAndOfferFirstToAsync(dequeName));
}
@Override
public V pollLastAndOfferFirstTo(RQueue<V> deque) {
return get(pollLastAndOfferFirstToAsync(deque.getName()));
}
@Override
public void destroy() {
queueTransferService.remove(getQueueName());

@ -20,6 +20,7 @@ import java.util.NoSuchElementException;
import org.redisson.api.RDeque;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -41,12 +42,12 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());
protected RedissonDeque(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
protected RedissonDeque(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
}
public RedissonDeque(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
public RedissonDeque(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
}
@Override

@ -39,9 +39,12 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.CronSchedule;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
@ -63,8 +66,10 @@ import org.redisson.misc.RPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
@ -87,9 +92,16 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String schedulerQueueName;
private final String schedulerChannelName;
private final String workersChannelName;
private final String workersSemaphoreName;
private final String workersCounterName;
private final String tasksCounterName;
private final String statusName;
private final RTopic<Integer> terminationTopic;
private final RRemoteService remoteService;
private final RTopic<String> workersTopic;
private int workersGroupListenerId;
private final RemoteExecutorServiceAsync asyncScheduledService;
private final RemoteExecutorServiceAsync asyncScheduledServiceAtFixed;
@ -120,6 +132,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
schedulerQueueName = objectName + ":scheduler";
schedulerTasksName = objectName + ":scheduler-tasks";
workersChannelName = objectName + ":workers-channel";
workersSemaphoreName = objectName + ":workers-semaphore";
workersCounterName = objectName + ":workers-counter";
remoteService = redisson.getRemoteService(name, codec);
workersTopic = redisson.getTopic(workersChannelName);
ExecutorRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor);
remoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
remoteService.setTasksCounterName(tasksCounterName);
@ -138,13 +157,36 @@ public class RedissonExecutorService implements RScheduledExecutorService {
asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
}
protected String generateRequestId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@Override
public int countActiveWorkers() {
String id = generateRequestId();
int subscribers = (int) workersTopic.publish(id);
RSemaphore semaphore = redisson.getSemaphore(workersSemaphoreName + ":" + id);
try {
semaphore.tryAcquire(subscribers, 10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
RAtomicLong atomicLong = redisson.getAtomicLong(workersCounterName + ":" + id);
long result = atomicLong.get();
redisson.getKeys().delete(semaphore, atomicLong);
return (int) result;
}
@Override
public void registerWorkers(int workers) {
registerWorkers(workers, commandExecutor.getConnectionManager().getExecutor());
}
@Override
public void registerWorkers(int workers, ExecutorService executor) {
public void registerWorkers(final int workers, ExecutorService executor) {
QueueTransferTask scheduler = new QueueTransferTask(connectionManager) {
@Override
protected RTopic<Long> getTopic() {
@ -181,7 +223,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
service.setSchedulerChannelName(schedulerChannelName);
service.setSchedulerQueueName(schedulerQueueName);
redisson.getRemoteService(name, codec).register(RemoteExecutorService.class, service, workers, executor);
remoteService.register(RemoteExecutorService.class, service, workers, executor);
workersGroupListenerId = workersTopic.addListener(new MessageListener<String>() {
@Override
public void onMessage(String channel, String id) {
redisson.getAtomicLong(workersCounterName + ":" + id).getAndAdd(workers);
redisson.getSemaphore(workersSemaphoreName + ":" + id).release();
}
});
}
@Override
@ -238,6 +287,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public void shutdown() {
remoteService.deregister(RemoteExecutorService.class);
workersTopic.removeListener(workersGroupListenerId);
commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"if redis.call('exists', KEYS[2]) == 0 then "
+ "if redis.call('get', KEYS[1]) == '0' or redis.call('exists', KEYS[1]) == 0 then "

@ -27,6 +27,7 @@ import org.redisson.api.GeoPosition;
import org.redisson.api.GeoUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RGeo;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.GeoEntryCodec;
import org.redisson.client.codec.ScoredCodec;
@ -55,14 +56,14 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
MultiDecoder<Map<Object, Object>> postitionDecoder;
MultiDecoder<Map<Object, Object>> distanceDecoder;
public RedissonGeo(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
public RedissonGeo(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name, redisson);
postitionDecoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
distanceDecoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
}
public RedissonGeo(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
public RedissonGeo(Codec codec, CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(codec, connectionManager, name, redisson);
postitionDecoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
distanceDecoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
}

@ -21,14 +21,21 @@ import java.util.List;
import org.redisson.api.RFuture;
import org.redisson.api.RLexSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
/**
* Sorted set contained values of String type
*
* @author Nikita Koksharov
*
*/
public class RedissonLexSortedSet extends RedissonScoredSortedSet<String> implements RLexSortedSet {
public RedissonLexSortedSet(CommandAsyncExecutor commandExecutor, String name) {
super(StringCodec.INSTANCE, commandExecutor, name);
public RedissonLexSortedSet(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(StringCodec.INSTANCE, commandExecutor, name, redisson);
}
@Override

@ -34,7 +34,9 @@ import java.util.NoSuchElementException;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -44,6 +46,7 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
/**
* Distributed and concurrent implementation of {@link java.util.List}
@ -56,14 +59,23 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public static final RedisCommand<Boolean> EVAL_BOOLEAN_ARGS2 = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5, ValueType.OBJECTS);
public RedissonList(CommandAsyncExecutor commandExecutor, String name) {
private RedissonClient redisson;
public RedissonList(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
this.redisson = redisson;
}
public RedissonList(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonList(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name);
this.redisson = redisson;
}
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
}
@Override
public int size() {
return get(sizeAsync());

@ -185,7 +185,7 @@ public class RedissonListMultimap<K, V> extends RedissonMultimap<K, V> implement
final String keyHash = hash(keyState);
final String setName = getValuesName(keyHash);
return new RedissonList<V>(codec, commandExecutor, setName) {
return new RedissonList<V>(codec, commandExecutor, setName, null) {
@Override
public RFuture<Boolean> deleteAsync() {

@ -28,7 +28,7 @@ public class RedissonListMultimapIterator<K, V, M> extends RedissonMultiMapItera
@Override
protected Iterator<V> getIterator(String name) {
RedissonList<V> set = new RedissonList<V>(codec, commandExecutor, map.getValuesName(name));
RedissonList<V> set = new RedissonList<V>(codec, commandExecutor, map.getValuesName(name), null);
return set.iterator();
}

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -68,7 +69,12 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
super(codec, commandExecutor, name);
this.timeoutSetName = timeoutSetName;
this.key = key;
this.list = new RedissonList<V>(codec, commandExecutor, name);
this.list = new RedissonList<V>(codec, commandExecutor, name, null);
}
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return null;
}
@Override

@ -37,6 +37,7 @@ import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.cache.Cache;
import org.redisson.cache.LFUCacheMap;
@ -193,13 +194,13 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private int invalidateEntryOnChange;
private int invalidationListenerId;
protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options) {
super(id, commandExecutor, name);
protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, RedissonClient redisson) {
super(id, commandExecutor, name, redisson);
init(id, name, options);
}
protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options) {
super(id, codec, connectionManager, name);
protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, RedissonClient redisson) {
super(id, codec, connectionManager, name, redisson);
init(id, name, options);
}

@ -34,6 +34,8 @@ import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
@ -47,6 +49,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.mapreduce.RedissonMapReduce;
import org.redisson.misc.Hash;
/**
@ -67,15 +70,23 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
private final UUID id;
private final RedissonClient redisson;
protected RedissonMap(UUID id, CommandAsyncExecutor commandExecutor, String name) {
protected RedissonMap(UUID id, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
this.id = id;
this.redisson = redisson;
}
public RedissonMap(UUID id, Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonMap(UUID id, Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name);
this.id = id;
this.redisson = redisson;
}
@Override
public <KOut, VOut> RMapReduce<K, V, KOut, VOut> mapReduce() {
return new RedissonMapReduce<K, V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
}
@Override
@ -403,10 +414,6 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return commandExecutor.readAsync(getName(key), codec, RedisCommands.HGET, getName(key), key);
}
protected String getName(Object key) {
return getName();
}
@Override
public RFuture<V> putAsync(K key, V value) {
if (key == null) {

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
@ -86,21 +87,21 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
static final RedisCommand<Boolean> EVAL_CONTAINS_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_VALUE);
static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
RedissonMapCache(UUID id, CommandAsyncExecutor commandExecutor, String name) {
super(id, commandExecutor, name);
RedissonMapCache(UUID id, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(id, commandExecutor, name, redisson);
}
RedissonMapCache(UUID id, Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(id, codec, commandExecutor, name);
RedissonMapCache(UUID id, Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(id, codec, commandExecutor, name, redisson);
}
public RedissonMapCache(UUID id, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(id, commandExecutor, name);
public RedissonMapCache(UUID id, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(id, commandExecutor, name, redisson);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName());
}
public RedissonMapCache(UUID id, Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(id, codec, commandExecutor, name);
public RedissonMapCache(UUID id, Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(id, codec, commandExecutor, name, redisson);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName());
}

@ -20,10 +20,10 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map.Entry;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.RedissonNodeConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
@ -134,6 +134,12 @@ public class RedissonNode {
config.getRedissonNodeInitializer().onStartup(this);
}
int mapReduceWorkers = config.getMapReduceWorkers();
if (mapReduceWorkers == 0) {
mapReduceWorkers = Runtime.getRuntime().availableProcessors();
}
redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME).registerWorkers(mapReduceWorkers);
for (Entry<String, Integer> entry : config.getExecutorServiceWorkers().entrySet()) {
String name = entry.getKey();
int workers = entry.getValue();

@ -93,7 +93,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
private RBucket<String> comparatorHolder;
protected RedissonPriorityQueue(CommandExecutor commandExecutor, String name, Redisson redisson) {
super(commandExecutor, name);
super(commandExecutor, name, redisson);
this.commandExecutor = commandExecutor;
comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE);
@ -103,7 +103,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
}
public RedissonPriorityQueue(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) {
super(codec, commandExecutor, name);
super(codec, commandExecutor, name, redisson);
this.commandExecutor = commandExecutor;
comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE);

@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
@ -33,12 +34,12 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
protected RedissonQueue(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
protected RedissonQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
}
protected RedissonQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
protected RedissonQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
}
@Override
@ -115,9 +116,4 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOPLPUSH, getName(), queueName);
}
@Override
public V pollLastAndOfferFirstTo(RQueue<V> queue) {
return pollLastAndOfferFirstTo(queue.getName());
}
}

@ -30,7 +30,9 @@ import java.util.Map.Entry;
import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
@ -44,6 +46,7 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
/**
*
@ -53,12 +56,21 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RScoredSortedSet<V> {
public RedissonScoredSortedSet(CommandAsyncExecutor commandExecutor, String name) {
private RedissonClient redisson;
public RedissonScoredSortedSet(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
this.redisson = redisson;
}
public RedissonScoredSortedSet(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonScoredSortedSet(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name);
this.redisson = redisson;
}
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
}
@Override

@ -26,7 +26,9 @@ import java.util.Set;
import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
@ -36,6 +38,7 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
@ -46,12 +49,21 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIterator {
protected RedissonSet(CommandAsyncExecutor commandExecutor, String name) {
RedissonClient redisson;
protected RedissonSet(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
this.redisson = redisson;
}
public RedissonSet(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonSet(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name);
this.redisson = redisson;
}
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
}
@Override
@ -79,10 +91,6 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return commandExecutor.readAsync(getName(o), codec, RedisCommands.SISMEMBER, getName(o), o);
}
protected String getName(Object o) {
return getName();
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);

@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RSetCache;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
@ -38,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
/**
* <p>Set-based cache with ability to set TTL for each entry via
@ -59,22 +62,23 @@ import org.redisson.eviction.EvictionScheduler;
*/
public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<V>, ScanIterator {
RedissonSetCache(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
RedissonSetCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
RedissonClient redisson;
public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
evictionScheduler.schedule(getName());
this.redisson = redisson;
}
public RedissonSetCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
public RedissonSetCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name);
evictionScheduler.schedule(getName());
this.redisson = redisson;
}
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
}
@Override

@ -162,7 +162,7 @@ public class RedissonSetMultimap<K, V> extends RedissonMultimap<K, V> implements
final String keyHash = hash(keyState);
final String setName = getValuesName(keyHash);
return new RedissonSet<V>(codec, commandExecutor, setName) {
return new RedissonSet<V>(codec, commandExecutor, setName, null) {
@Override
public RFuture<Boolean> deleteAsync() {

@ -28,7 +28,7 @@ public class RedissonSetMultimapIterator<K, V, M> extends RedissonMultiMapIterat
@Override
protected Iterator<V> getIterator(String name) {
RedissonSet<V> set = new RedissonSet<V>(codec, commandExecutor, map.getValuesName(name));
RedissonSet<V> set = new RedissonSet<V>(codec, commandExecutor, map.getValuesName(name), null);
return set.iterator();
}

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
@ -66,7 +67,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
super(codec, commandExecutor, name);
this.timeoutSetName = timeoutSetName;
this.key = key;
this.set = new RedissonSet<V>(codec, commandExecutor, name);
this.set = new RedissonSet<V>(codec, commandExecutor, name, null);
}
@Override
@ -74,6 +75,11 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return get(sizeAsync());
}
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return null;
}
@Override
public RFuture<Boolean> clearExpireAsync() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");

@ -32,10 +32,13 @@ import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RPromise;
/**
@ -94,10 +97,12 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
private RLock lock;
private RedissonList<V> list;
private RBucket<String> comparatorHolder;
private RedissonClient redisson;
protected RedissonSortedSet(CommandExecutor commandExecutor, String name, Redisson redisson) {
protected RedissonSortedSet(CommandExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.redisson = redisson;
comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE);
lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}");
@ -116,6 +121,11 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
loadComparator();
}
@Override
public <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce() {
return new RedissonCollectionMapReduce<V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
}
private void loadComparator() {
try {

@ -58,7 +58,7 @@ public class RedissonSubList<V> extends RedissonList<V> implements RList<V> {
int size = -1;
protected RedissonSubList(Codec codec, CommandAsyncExecutor commandExecutor, String name, int fromIndex, int toIndex) {
super(codec, commandExecutor, name);
super(codec, commandExecutor, name, null);
this.fromIndex = fromIndex;
this.toIndex.set(toIndex);
}

@ -19,13 +19,18 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
/**
* Distributed implementation of {@link java.util.concurrent.ExecutorService}
* Redis based implementation of {@link java.util.concurrent.ExecutorService}
*
* @author Nikita Koksharov
*
*/
public interface RExecutorService extends ExecutorService, RExecutorServiceAsync {
/**
* MapReduce's executor name
*/
String MAPREDUCE_NAME = "redisson_mapreduce";
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
@ -85,11 +90,18 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
void registerWorkers(int workers);
/**
* Register workers with custom executor which executes each task
* Register workers with custom executor
*
* @param workers - workers amount
* @param executor - executor instance
*/
void registerWorkers(int workers, ExecutorService executor);
/**
* Returns active worker groups
*
* @return active worker groups count
*/
int countActiveWorkers();
}

@ -18,7 +18,7 @@ package org.redisson.api;
import java.util.concurrent.Callable;
/**
* Distributed implementation of {@link java.util.concurrent.ExecutorService}
* Redis based implementation of {@link java.util.concurrent.ExecutorService}
*
* @author Nikita Koksharov
*

@ -27,6 +27,7 @@ public interface RKeys extends RKeysAsync {
/**
* Update the last access time of an object.
*
* @param names of keys
* @return count of objects were touched
*/
long touch(String... names);
@ -138,7 +139,7 @@ public interface RKeys extends RKeysAsync {
/**
* Delete multiple objects
*
* @param objects
* @param objects of Redisson
* @return number of removed keys
*/
long delete(RObject ... objects);

@ -27,6 +27,7 @@ public interface RKeysAsync {
/**
* Update the last access time of an object.
*
* @param names of keys
* @return count of objects were touched
*/
RFuture<Long> touchAsync(String... names);
@ -94,7 +95,7 @@ public interface RKeysAsync {
/**
* Delete multiple objects
*
* @param objects
* @param objects of Redisson
* @return number of removed keys
*/
RFuture<Long> deleteAsync(RObject ... objects);

@ -18,8 +18,25 @@ package org.redisson.api;
import java.util.Collection;
import java.util.Set;
import org.redisson.api.mapreduce.RCollectionMapReduce;
/**
* Sorted set contained values of String type
*
* @author Nikita Koksharov
*
*/
public interface RLexSortedSet extends RLexSortedSetAsync, Set<String>, RExpirable {
/**
* Returns <code>RMapReduce</code> object associated with this object
*
* @param <KOut> output key
* @param <VOut> output value
* @return MapReduce instance
*/
<KOut, VOut> RCollectionMapReduce<String, KOut, VOut> mapReduce();
String pollFirst();
String pollLast();

@ -17,6 +17,12 @@ package org.redisson.api;
import java.util.Collection;
/**
* Sorted set contained values of String type
*
* @author Nikita Koksharov
*
*/
public interface RLexSortedSetAsync extends RCollectionAsync<String> {
RFuture<String> pollLastAsync();

@ -18,6 +18,8 @@ package org.redisson.api;
import java.util.List;
import java.util.RandomAccess;
import org.redisson.api.mapreduce.RCollectionMapReduce;
/**
* Distributed and concurrent implementation of {@link java.util.List}
*
@ -27,6 +29,15 @@ import java.util.RandomAccess;
*/
public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<List<V>>, RandomAccess {
/**
* Returns <code>RMapReduce</code> object associated with this map
*
* @param <KOut> output key
* @param <VOut> output value
* @return MapReduce instance
*/
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
/**
* Add <code>element</code> after <code>elementToFind</code>
*

@ -20,6 +20,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.mapreduce.RMapReduce;
/**
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
* and {@link java.util.Map}
@ -33,6 +35,15 @@ import java.util.concurrent.ConcurrentMap;
*/
public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K, V> {
/**
* Returns <code>RMapReduce</code> object associated with this map
*
* @param <KOut> output key
* @param <VOut> output value
* @return MapReduce instance
*/
<KOut, VOut> RMapReduce<K, V, KOut, VOut> mapReduce();
/**
* Returns <code>RReadWriteLock</code> instance associated with key
*

@ -29,9 +29,6 @@ public interface RQueue<V> extends Queue<V>, RExpirable, RQueueAsync<V> {
V pollLastAndOfferFirstTo(String dequeName);
@Deprecated
V pollLastAndOfferFirstTo(RQueue<V> deque);
List<V> readAll();
}

@ -21,7 +21,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Distributed implementation of {@link java.util.concurrent.ScheduledExecutorService}
* Redis based implementation of {@link java.util.concurrent.ScheduledExecutorService}
*
* @author Nikita Koksharov
*

@ -19,7 +19,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* Distributed implementation of {@link java.util.concurrent.ScheduledExecutorService}
* Redis based implementation of {@link java.util.concurrent.ScheduledExecutorService}
*
* @author Nikita Koksharov
*

@ -19,6 +19,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.protocol.ScoredEntry;
/**
@ -35,6 +36,15 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
}
/**
* Returns <code>RMapReduce</code> object associated with this object
*
* @param <KOut> output key
* @param <VOut> output value
* @return MapReduce instance
*/
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
V pollFirst();
V pollLast();

@ -17,6 +17,8 @@ package org.redisson.api;
import java.util.Set;
import org.redisson.api.mapreduce.RCollectionMapReduce;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
*
@ -26,6 +28,15 @@ import java.util.Set;
*/
public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set<V>> {
/**
* Returns <code>RMapReduce</code> object associated with this object
*
* @param <KOut> output key
* @param <VOut> output value
* @return MapReduce instance
*/
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
/**
* Removes and returns random elements from set
*

@ -18,6 +18,8 @@ package org.redisson.api;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.api.mapreduce.RCollectionMapReduce;
/**
* <p>Set-based cache with ability to set TTL for each object.
* </p>
@ -37,6 +39,15 @@ import java.util.concurrent.TimeUnit;
*/
public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V> {
/**
* Returns <code>RMapReduce</code> object associated with this map
*
* @param <KOut> output key
* @param <VOut> output value
* @return MapReduce instance
*/
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
/**
* Stores value with specified time to live.
* Value expires after specified time to live.

@ -19,6 +19,8 @@ import java.util.Comparator;
import java.util.Set;
import java.util.SortedSet;
import org.redisson.api.mapreduce.RCollectionMapReduce;
/**
*
* @author Nikita Koksharov
@ -27,6 +29,15 @@ import java.util.SortedSet;
*/
public interface RSortedSet<V> extends SortedSet<V>, RObject {
/**
* Returns <code>RMapReduce</code> object associated with this object
*
* @param <KOut> output key
* @param <VOut> output value
* @return MapReduce instance
*/
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
Set<V> readAll();
RFuture<Set<V>> readAllAsync();

@ -0,0 +1,41 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.mapreduce;
import java.io.Serializable;
import java.util.Map;
/**
* Collates result from {@link RReducer} tasks and produces a single result object.
* Executes only once.
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
* @param <R> result type
*/
public interface RCollator<K, V, R> extends Serializable {
/**
* Collates result map from reduce phase of MapReduce process.
*
* @param resultMap contains reduced entires
* @return single result object
*/
R collate(Map<K, V> resultMap);
}

@ -0,0 +1,107 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.mapreduce;
import org.redisson.api.RList;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSortedSet;
/**
*
* MapReduce allows to process large amount of data stored in
* {@link RSet}, {@link RList}, {@link RSetCache}, {@link RScoredSortedSet}, {@link RSortedSet} and others
* using Mapper, Reducer and/or Collator tasks launched across Redisson Nodes.
* <p>
* Usage example:
*
* <pre>
* public class WordMapper implements RCollectionMapper&lt;String, String, Integer&gt; {
*
* public void map(String value, RCollector&lt;String, Integer&gt; collector) {
* String[] words = value.split(&quot;[^a-zA-Z]&quot;);
* for (String word : words) {
* collector.emit(word, 1);
* }
* }
*
* }
*
* public class WordReducer implements RReducer&lt;String, Integer&gt; {
*
* public Integer reduce(String reducedKey, Iterator&lt;Integer&gt; iter) {
* int sum = 0;
* while (iter.hasNext()) {
* Integer i = (Integer) iter.next();
* sum += i;
* }
* return sum;
* }
*
* }
*
* public class WordCollator implements RCollator&lt;String, Integer, Integer&gt; {
*
* public Integer collate(Map&lt;String, Integer&gt; resultMap) {
* int result = 0;
* for (Integer count : resultMap.values()) {
* result += count;
* }
* return result;
* }
*
* }
*
* RList&lt;String&gt; list = redisson.getList(&quot;myWords&quot;);
*
* Map&lt;String, Integer&gt; wordsCount = list.&lt;String, Integer&gt;mapReduce()
* .mapper(new WordMapper())
* .reducer(new WordReducer())
* .execute();
*
* Integer totalCount = list.&lt;String, Integer&gt;mapReduce()
* .mapper(new WordMapper())
* .reducer(new WordReducer())
* .execute(new WordCollator());
*
* </pre>
*
* @author Nikita Koksharov
*
* @param <VIn> input value
* @param <KOut> output key
* @param <VOut> output value
*/
public interface RCollectionMapReduce<VIn, KOut, VOut> extends RMapReduceExecutor<VIn, KOut, VOut> {
/**
* Setup Mapper object
*
* @param mapper used during MapReduce
* @return self instance
*/
RCollectionMapReduce<VIn, KOut, VOut> mapper(RCollectionMapper<VIn, KOut, VOut> mapper);
/**
* Setup Reducer object
*
* @param reducer used during MapReduce
* @return self instance
*/
RCollectionMapReduce<VIn, KOut, VOut> reducer(RReducer<KOut, VOut> reducer);
}

@ -0,0 +1,40 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.mapreduce;
/**
* Mapper task invoked during map phase of MapReduce process and launched across Redisson Nodes.
* Every task stores transformed result of input key and value into {@link RCollector} instance.
* Collected results are handled by {@link RReducer} instance once
* all Mapper tasks have finished.
*
* @author Nikita Koksharov
*
* @param <VIn> input value
* @param <KOut> output key
* @param <VOut> output value
*/
public interface RCollectionMapper<VIn, KOut, VOut> {
/**
* Invoked for each Collection source entry
*
* @param value - input value
* @param collector - instance shared across all Mapper tasks
*/
void map(VIn value, RCollector<KOut, VOut> collector);
}

@ -0,0 +1,38 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.mapreduce;
/**
* Stores each key/value mapping during map phase of MapReduce process.
* Later used in reduce phase.
*
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface RCollector<K, V> {
/**
* Store key/value
*
* @param key available to reduce
* @param value available to reduce
*/
void emit(K key, V value);
}

@ -0,0 +1,101 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.mapreduce;
/**
*
* MapReduce allows to process large amount of data stored in Redis map
* using Mapper, Reducer and/or Collator tasks launched across Redisson Nodes.
* <p>
* Usage example:
*
* <pre>
* public class WordMapper implements RMapper&lt;String, String, String, Integer&gt; {
*
* public void map(String key, String value, RCollector&lt;String, Integer&gt; collector) {
* String[] words = value.split(&quot;[^a-zA-Z]&quot;);
* for (String word : words) {
* collector.emit(word, 1);
* }
* }
*
* }
*
* public class WordReducer implements RReducer&lt;String, Integer&gt; {
*
* public Integer reduce(String reducedKey, Iterator&lt;Integer&gt; iter) {
* int sum = 0;
* while (iter.hasNext()) {
* Integer i = (Integer) iter.next();
* sum += i;
* }
* return sum;
* }
*
* }
*
* public class WordCollator implements RCollator&lt;String, Integer, Integer&gt; {
*
* public Integer collate(Map&lt;String, Integer&gt; resultMap) {
* int result = 0;
* for (Integer count : resultMap.values()) {
* result += count;
* }
* return result;
* }
*
* }
*
* RMap&lt;String, String&gt; map = redisson.getMap(&quot;myWords&quot;);
*
* Map&lt;String, Integer&gt; wordsCount = map.&lt;String, Integer&gt;mapReduce()
* .mapper(new WordMapper())
* .reducer(new WordReducer())
* .execute();
*
* Integer totalCount = map.&lt;String, Integer&gt;mapReduce()
* .mapper(new WordMapper())
* .reducer(new WordReducer())
* .execute(new WordCollator());
*
* </pre>
*
* @author Nikita Koksharov
*
* @param <KIn> input key
* @param <VIn> input value
* @param <KOut> output key
* @param <VOut> output value
*/
public interface RMapReduce<KIn, VIn, KOut, VOut> extends RMapReduceExecutor<VIn, KOut, VOut> {
/**
* Setup Mapper object
*
* @param mapper used during MapReduce
* @return self instance
*/
RMapReduce<KIn, VIn, KOut, VOut> mapper(RMapper<KIn, VIn, KOut, VOut> mapper);
/**
* Setup Reducer object
*
* @param reducer used during MapReduce
* @return self instance
*/
RMapReduce<KIn, VIn, KOut, VOut> reducer(RReducer<KOut, VOut> reducer);
}

@ -0,0 +1,85 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.mapreduce;
import java.util.Map;
import org.redisson.api.RFuture;
/**
* Contains methods for MapReduce process execution.
*
* @author Nikita Koksharov
*
* @param <VIn> input value
* @param <KOut> output key
* @param <VOut> output value
*/
public interface RMapReduceExecutor<VIn, KOut, VOut> {
/**
* Executes MapReduce process across Redisson Nodes
*
* @return map containing reduced keys and values
*/
Map<KOut, VOut> execute();
/**
* Executes MapReduce process across Redisson Nodes
* in asynchronous mode
*
* @return map containing reduced keys and values
*/
RFuture<Map<KOut, VOut>> executeAsync();
/**
* Executes MapReduce process across Redisson Nodes
* and stores result in map with <code>resultMapName</code>
*
* @param resultMapName - destination map name
*/
void execute(String resultMapName);
/**
* Executes MapReduce process across Redisson Nodes
* in asynchronous mode and stores result in map with <code>resultMapName</code>
*
* @param resultMapName - destination map name
* @return void
*/
RFuture<Void> executeAsync(String resultMapName);
/**
* Executes MapReduce process across Redisson Nodes
* and collides result using defined <code>collator</code>
*
* @param <R> result type
* @param collator applied to result
* @return collated result
*/
<R> R execute(RCollator<KOut, VOut, R> collator);
/**
* Executes MapReduce process across Redisson Nodes
* in asynchronous mode and collides result using defined <code>collator</code>
*
* @param <R> result type
* @param collator applied to result
* @return collated result
*/
<R> RFuture<R> executeAsync(RCollator<KOut, VOut, R> collator);
}

@ -0,0 +1,44 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.mapreduce;
import java.io.Serializable;
/**
* Mapper task invoked during map phase of MapReduce process and launched across Redisson Nodes.
* Every task stores transformed result of input key/value into {@link RCollector} instance.
* Collected results are handled by {@link RReducer} instance once
* all Mapper tasks have finished.
*
* @author Nikita Koksharov
*
* @param <KIn> input key
* @param <VIn> input value
* @param <KOut> output key
* @param <VOut> output value
*/
public interface RMapper<KIn, VIn, KOut, VOut> extends Serializable {
/**
* Invoked for each Map source entry
*
* @param key - input key
* @param value - input value
* @param collector - instance shared across all Mapper tasks
*/
void map(KIn key, VIn value, RCollector<KOut, VOut> collector);
}

@ -0,0 +1,40 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.mapreduce;
import java.io.Serializable;
import java.util.Iterator;
/**
* Reduces values mapped by key into single value.
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface RReducer<K, V> extends Serializable {
/**
* Invoked for each key
*
* @param reducedKey - key
* @param iter - collection of values
* @return value
*/
V reduce(K reducedKey, Iterator<V> iter);
}

@ -319,7 +319,7 @@ public class Config {
*
* @see ReplicatedConnectionManager on how to implement a connection
* manager.
* @param connectionManager
* @param connectionManager for supply
*/
public void useCustomServers(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;

@ -23,12 +23,14 @@ import java.util.Map;
import org.redisson.api.RedissonNodeInitializer;
/**
* Redisson Node configuration
*
* @author Nikita Koksharov
*
*/
public class RedissonNodeConfig extends Config {
private int mapReduceWorkers = 0;
private RedissonNodeInitializer redissonNodeInitializer;
private Map<String, Integer> executorServiceWorkers = new HashMap<String, Integer>();
@ -46,6 +48,23 @@ public class RedissonNodeConfig extends Config {
this.redissonNodeInitializer = oldConf.redissonNodeInitializer;
}
/**
* MapReduce workers amount.
* 0 = current_processors_amount
* <p>
* Default is <code>0</code>
*
* @param mapReduceWorkers workers for MapReduce
* @return config
*/
public RedissonNodeConfig setMapReduceWorkers(int mapReduceWorkers) {
this.mapReduceWorkers = mapReduceWorkers;
return this;
}
public int getMapReduceWorkers() {
return mapReduceWorkers;
}
/**
* Executor service workers amount per service name
*
@ -60,10 +79,6 @@ public class RedissonNodeConfig extends Config {
return executorServiceWorkers;
}
public RedissonNodeInitializer getRedissonNodeInitializer() {
return redissonNodeInitializer;
}
/**
* Redisson node initializer
*
@ -74,6 +89,10 @@ public class RedissonNodeConfig extends Config {
this.redissonNodeInitializer = redissonNodeInitializer;
return this;
}
public RedissonNodeInitializer getRedissonNodeInitializer() {
return redissonNodeInitializer;
}
/**
* Read config object stored in JSON format from <code>File</code>

@ -55,8 +55,8 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.TransferListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ -37,7 +37,7 @@ import io.netty.util.concurrent.FutureListener;
*
* @author Nikita Koksharov
*
* @param <V>
* @param <V> value type
*/
public class RedissonCompletionService<V> implements CompletionService<V> {

@ -29,6 +29,7 @@ import org.redisson.api.annotation.RInject;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.Injector;
import org.redisson.remote.RemoteParams;
import io.netty.buffer.ByteBuf;
@ -183,18 +184,7 @@ public class RemoteExecutorServiceImpl implements RemoteExecutorService, RemoteP
private <T> T decode(ByteBuf buf) throws IOException {
T task = (T) codec.getValueDecoder().decode(buf, null);
Field[] fields = task.getClass().getDeclaredFields();
for (Field field : fields) {
if (RedissonClient.class.isAssignableFrom(field.getType())
&& field.isAnnotationPresent(RInject.class)) {
field.setAccessible(true);
try {
field.set(task, redisson);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
}
Injector.inject(task, redisson);
return task;
}

@ -0,0 +1,94 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.redisson.api.RExecutorService;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
import org.redisson.api.mapreduce.RCollector;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec;
/**
*
* @author Nikita Koksharov
*
* @param <KOut> output key
* @param <VOut> output value
*/
public abstract class BaseMapperTask<KOut, VOut> implements Callable<Integer>, Serializable {
private static final long serialVersionUID = 7559371478909848610L;
@RInject
protected RedissonClient redisson;
private RReducer<KOut, VOut> reducer;
protected String objectName;
protected Class<?> objectClass;
private Class<?> objectCodecClass;
private String semaphoreName;
private String resultMapName;
protected Codec codec;
public BaseMapperTask() {
}
public BaseMapperTask(RReducer<KOut, VOut> reducer,
String mapName, String semaphoreName, String resultMapName, Class<?> mapCodecClass, Class<?> objectClass) {
super();
this.reducer = reducer;
this.objectName = mapName;
this.objectCodecClass = mapCodecClass;
this.objectClass = objectClass;
this.semaphoreName = semaphoreName;
this.resultMapName = resultMapName;
}
@Override
public Integer call() {
try {
this.codec = (Codec) objectCodecClass.getConstructor().newInstance();
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
RScheduledExecutorService executor = redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME);
int workersAmount = executor.countActiveWorkers();
UUID id = UUID.randomUUID();
RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, objectName + ":collector:" + id, workersAmount);
map(collector);
for (int i = 0; i < workersAmount; i++) {
String name = objectName + ":collector:" + id + ":" + i;
Runnable runnable = new ReducerTask<KOut, VOut>(name, reducer, objectCodecClass, semaphoreName, resultMapName);
executor.submit(runnable);
}
return workersAmount;
}
protected abstract void map(RCollector<KOut, VOut> collector);
}

@ -0,0 +1,73 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import java.util.concurrent.Callable;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.client.codec.Codec;
import org.redisson.misc.Injector;
/**
*
* @author Nikita Koksharov
*
* @param <KOut> key type
* @param <VOut> value type
* @param <R> result type
*/
public class CollatorTask<KOut, VOut, R> implements Callable<R> {
@RInject
private RedissonClient redisson;
private RCollator<KOut, VOut, R> collator;
private String resultMapName;
private Class<?> codecClass;
private Codec codec;
public CollatorTask() {
}
public CollatorTask(RCollator<KOut, VOut, R> collator, String resultMapName, Class<?> codecClass) {
super();
this.collator = collator;
this.resultMapName = resultMapName;
this.codecClass = codecClass;
}
@Override
public R call() throws Exception {
try {
this.codec = (Codec) codecClass.getConstructor().newInstance();
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
Injector.inject(collator, redisson);
RMap<KOut, VOut> resultMap = redisson.getMap(resultMapName, codec);
R result = collator.collate(resultMap);
resultMap.delete();
return result;
}
}

@ -0,0 +1,78 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import org.redisson.api.RLexSortedSet;
import org.redisson.api.RList;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSortedSet;
import org.redisson.api.mapreduce.RCollectionMapper;
import org.redisson.api.mapreduce.RCollector;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.misc.Injector;
/**
*
* @author Nikita Koksharov
*
* @param <VIn> input value type
* @param <KOut> output key type
* @param <VOut> output value type
*/
public class CollectionMapperTask<VIn, KOut, VOut> extends BaseMapperTask<KOut, VOut> {
private static final long serialVersionUID = -2634049426877164580L;
RCollectionMapper<VIn, KOut, VOut> mapper;
public CollectionMapperTask() {
}
public CollectionMapperTask(RCollectionMapper<VIn, KOut, VOut> mapper, RReducer<KOut, VOut> reducer, String mapName, String semaphoreName, String resultMapName,
Class<?> mapCodecClass, Class<?> mapClass) {
super(reducer, mapName, semaphoreName, resultMapName, mapCodecClass, mapClass);
this.mapper = mapper;
}
@Override
protected void map(RCollector<KOut, VOut> collector) {
Injector.inject(mapper, redisson);
Iterable<VIn> collection = null;
if (RSetCache.class.isAssignableFrom(objectClass)) {
collection = redisson.getSetCache(objectName, codec);
} else if (RSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSet(objectName, codec);
} else if (RSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSortedSet(objectName, codec);
} else if (RScoredSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getScoredSortedSet(objectName, codec);
} else if (RLexSortedSet.class.isAssignableFrom(objectClass)) {
collection = (Iterable<VIn>) redisson.getLexSortedSet(objectName);
} else if (RList.class.isAssignableFrom(objectClass)) {
collection = redisson.getList(objectName, codec);
} else {
throw new IllegalStateException("Unable to work with " + objectClass);
}
for (VIn value : collection) {
mapper.map(value, collector);
}
}
}

@ -0,0 +1,63 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import java.io.IOException;
import org.redisson.api.RListMultimap;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollector;
import org.redisson.client.codec.Codec;
import net.openhft.hashing.LongHashFunction;
/**
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class Collector<K, V> implements RCollector<K, V> {
private RedissonClient client;
private String name;
private int parts;
private Codec codec;
public Collector(Codec codec, RedissonClient client, String name, int parts) {
super();
this.client = client;
this.name = name;
this.parts = parts;
this.codec = codec;
}
@Override
public void emit(K key, V value) {
try {
byte[] encodedKey = codec.getValueEncoder().encode(key);
long hash = LongHashFunction.xx_r39().hashBytes(encodedKey);
String partName = name + ":" + Math.abs(hash % parts);
RListMultimap<K, V> multimap = client.getListMultimap(partName, codec);
multimap.put(key, value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}

@ -0,0 +1,227 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.redisson.api.RBatch;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RMapAsync;
import org.redisson.api.RObject;
import org.redisson.api.RSemaphore;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RMapReduceExecutor;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec;
import org.redisson.connection.ConnectionManager;
import org.redisson.misc.RPromise;
import org.redisson.misc.TransferListener;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
* @param <M> mapper type
* @param <VIn> input value type
* @param <KOut> output key type
* @param <VOut> output value type
*/
abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecutor<VIn, KOut, VOut> {
private final RedissonClient redisson;
private final RExecutorService executorService;
final String semaphoreName;
final String resultMapName;
final Codec objectCodec;
final String objectName;
final Class<?> objectClass;
private ConnectionManager connectionManager;
RReducer<KOut, VOut> reducer;
M mapper;
public MapReduceExecutor(RObject object, RedissonClient redisson, ConnectionManager connectionManager) {
this.objectName = object.getName();
this.objectCodec = object.getCodec();
this.objectClass = object.getClass();
this.redisson = redisson;
UUID id = UUID.randomUUID();
this.semaphoreName = object.getName() + ":semaphore:" + id;
this.resultMapName = object.getName() + ":result:" + id;
this.executorService = redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME);
this.connectionManager = connectionManager;
}
protected void check(Object task) {
if (task == null) {
throw new NullPointerException("Task is not defined");
}
if (task.getClass().isAnonymousClass()) {
throw new IllegalArgumentException("Task can't be created using anonymous class");
}
if (task.getClass().isMemberClass()
&& !Modifier.isStatic(task.getClass().getModifiers())) {
throw new IllegalArgumentException("Task class is an inner class and it should be static");
}
}
@Override
public Map<KOut, VOut> execute() {
return connectionManager.getCommandExecutor().get(executeAsync());
}
@Override
public RFuture<Map<KOut, VOut>> executeAsync() {
final RPromise<Map<KOut, VOut>> promise = connectionManager.newPromise();
executeMapperAsync(resultMapName).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
RBatch batch = redisson.createBatch();
RMapAsync<KOut, VOut> resultMap = batch.getMap(resultMapName, objectCodec);
resultMap.readAllMapAsync().addListener(new TransferListener<Map<KOut, VOut>>(promise));
resultMap.deleteAsync();
batch.executeAsync();
}
});
return promise;
}
@Override
public void execute(String resultMapName) {
connectionManager.getCommandExecutor().get(executeAsync(resultMapName));
}
@Override
public RFuture<Void> executeAsync(String resultMapName) {
final RPromise<Void> promise = connectionManager.newPromise();
executeMapperAsync(resultMapName).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(null);
}
});
return promise;
}
private RPromise<Void> executeMapperAsync(String resultMapName) {
if (mapper == null) {
throw new NullPointerException("Mapper is not defined");
}
if (reducer == null) {
throw new NullPointerException("Reducer is not defined");
}
final RPromise<Void> promise = connectionManager.newPromise();
Callable<Integer> task = createTask(resultMapName);
executorService.submit(task).addListener(new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
Integer workers = future.getNow();
final RSemaphore semaphore = redisson.getSemaphore(semaphoreName);
semaphore.acquireAsync(workers).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
semaphore.deleteAsync().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(null);
}
});
}
});
}
});
return promise;
}
protected abstract Callable<Integer> createTask(String resultMapName);
@Override
public <R> R execute(RCollator<KOut, VOut, R> collator) {
return connectionManager.getCommandExecutor().get(executeAsync(collator));
}
@Override
public <R> RFuture<R> executeAsync(final RCollator<KOut, VOut, R> collator) {
check(collator);
final RPromise<R> promise = connectionManager.newPromise();
executeMapperAsync(resultMapName).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
Callable<R> collatorTask = new CollatorTask<KOut, VOut, R>(collator, resultMapName, objectCodec.getClass());
executorService.submit(collatorTask).addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(future.getNow());
}
});
}
});
return promise;
}
}

@ -0,0 +1,67 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import java.util.Map.Entry;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.mapreduce.RCollector;
import org.redisson.api.mapreduce.RMapper;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.misc.Injector;
/**
*
* @author Nikita Koksharov
*
* @param <KIn> input key type
* @param <VIn> input key type
* @param <KOut> output key type
* @param <VOut> output key type
*/
public class MapperTask<KIn, VIn, KOut, VOut> extends BaseMapperTask<KOut, VOut> {
private static final long serialVersionUID = 2441161019495880394L;
RMapper<KIn, VIn, KOut, VOut> mapper;
public MapperTask() {
}
public MapperTask(RMapper<KIn, VIn, KOut, VOut> mapper, RReducer<KOut, VOut> reducer, String mapName, String semaphoreName, String resultMapName,
Class<?> mapCodecClass, Class<?> mapClass) {
super(reducer, mapName, semaphoreName, resultMapName, mapCodecClass, mapClass);
this.mapper = mapper;
}
@Override
protected void map(RCollector<KOut, VOut> collector) {
Injector.inject(mapper, redisson);
RMap<KIn, VIn> map = null;
if (RMapCache.class.isAssignableFrom(objectClass)) {
map = redisson.getMapCache(objectName, codec);
} else {
map = redisson.getMap(objectName, codec);
}
for (Entry<KIn, VIn> entry : map.entrySet()) {
mapper.map(entry.getKey(), entry.getValue(), collector);
}
}
}

@ -0,0 +1,59 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.api.mapreduce.RCollectionMapper;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.connection.ConnectionManager;
/**
*
* @author Nikita Koksharov
*
* @param <VIn> input value type
* @param <KOut> output key type
* @param <VOut> output value type
*/
public class RedissonCollectionMapReduce<VIn, KOut, VOut> extends MapReduceExecutor<RCollectionMapper<VIn, KOut, VOut>, VIn, KOut, VOut>
implements RCollectionMapReduce<VIn, KOut, VOut> {
public RedissonCollectionMapReduce(RObject object, RedissonClient redisson, ConnectionManager connectionManager) {
super(object, redisson, connectionManager);
}
@Override
public RCollectionMapReduce<VIn, KOut, VOut> mapper(RCollectionMapper<VIn, KOut, VOut> mapper) {
check(mapper);
this.mapper = mapper;
return this;
}
@Override
public RCollectionMapReduce<VIn, KOut, VOut> reducer(RReducer<KOut, VOut> reducer) {
check(reducer);
this.reducer = reducer;
return this;
}
@Override
protected CollectionMapperTask<VIn, KOut, VOut> createTask(String resultMapName) {
return new CollectionMapperTask<VIn, KOut, VOut>(mapper, reducer, objectName, semaphoreName, resultMapName, objectCodec.getClass(), objectClass);
}
}

@ -0,0 +1,64 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import java.util.Map;
import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.api.mapreduce.RMapper;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.connection.ConnectionManager;
/**
*
* @author Nikita Koksharov
*
* @param <KIn> input key type
* @param <VIn> input value type
* @param <KOut> output key type
* @param <VOut> output value type
*/
public class RedissonMapReduce<KIn, VIn, KOut, VOut> extends MapReduceExecutor<RMapper<KIn, VIn, KOut, VOut>, VIn, KOut, VOut>
implements RMapReduce<KIn, VIn, KOut, VOut> {
public RedissonMapReduce(RObject object, RedissonClient redisson, ConnectionManager connectionManager) {
super(object, redisson, connectionManager);
}
@Override
public RMapReduce<KIn, VIn, KOut, VOut> mapper(RMapper<KIn, VIn, KOut, VOut> mapper) {
check(mapper);
this.mapper = mapper;
return this;
}
@Override
public RMapReduce<KIn, VIn, KOut, VOut> reducer(RReducer<KOut, VOut> reducer) {
check(reducer);
this.reducer = reducer;
return this;
}
@Override
protected Callable<Integer> createTask(String resultMapName) {
return new MapperTask<KIn, VIn, KOut, VOut>(mapper, reducer, objectName, semaphoreName, resultMapName, objectCodec.getClass(), objectClass);
}
}

@ -0,0 +1,82 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import java.io.Serializable;
import java.util.List;
import org.redisson.api.RListMultimap;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec;
import org.redisson.misc.Injector;
/**
*
* @author Nikita Koksharov
*
* @param <KOut> key
* @param <VOut> value
*/
public class ReducerTask<KOut, VOut> implements Runnable, Serializable {
private static final long serialVersionUID = 3556632668150314703L;
@RInject
private RedissonClient redisson;
private String name;
private String semaphoreName;
private String resultMapName;
private RReducer<KOut, VOut> reducer;
private Class<?> codecClass;
private Codec codec;
public ReducerTask() {
}
public ReducerTask(String name, RReducer<KOut, VOut> reducer, Class<?> codecClass, String semaphoreName, String resultMapName) {
this.name = name;
this.reducer = reducer;
this.semaphoreName = semaphoreName;
this.resultMapName = resultMapName;
this.codecClass = codecClass;
}
@Override
public void run() {
try {
this.codec = (Codec) codecClass.getConstructor().newInstance();
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
Injector.inject(reducer, redisson);
RMap<KOut, VOut> map = redisson.getMap(resultMapName);
RListMultimap<KOut, VOut> multimap = redisson.getListMultimap(name, codec);
for (KOut key : multimap.keySet()) {
List<VOut> values = multimap.get(key);
VOut out = reducer.reduce(key, values.iterator());
map.put(key, out);
}
multimap.delete();
redisson.getSemaphore(semaphoreName).release();
}
}

@ -0,0 +1,63 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
/**
*
* @author Nikita Koksharov
*
*/
public class Injector {
public static void inject(Object task, RedissonClient redisson) {
List<Field> allFields = new ArrayList<Field>();
Class<?> clazz = task.getClass();
while (true) {
if (clazz != null) {
Field[] fields = clazz.getDeclaredFields();
allFields.addAll(Arrays.asList(fields));
} else {
break;
}
if (clazz.getSuperclass() != Object.class) {
clazz = clazz.getSuperclass();
} else {
clazz = null;
}
}
for (Field field : allFields) {
if (RedissonClient.class.isAssignableFrom(field.getType())
&& field.isAnnotationPresent(RInject.class)) {
field.setAccessible(true);
try {
field.set(task, redisson);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
}
}
}

@ -13,9 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.pubsub;
import org.redisson.misc.RPromise;
package org.redisson.misc;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

@ -27,6 +27,7 @@ import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.TransferListener;
import io.netty.util.internal.PlatformDependent;

@ -56,12 +56,12 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonList<V>(commandExecutor, name);
instance = new RedissonList<V>(commandExecutor, name, null);
}
public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonList<V>(codec, commandExecutor, name);
instance = new RedissonList<V>(codec, commandExecutor, name, null);
}
@Override

@ -70,12 +70,12 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(id, evictionScheduler, commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(id, evictionScheduler, commandExecutor, name, null);
}
public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(id, codec, evictionScheduler, commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(id, codec, evictionScheduler, commandExecutor, name, null);
}
@Override

@ -50,12 +50,12 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonMap<K, V>(null, codec, commandExecutor, name);
instance = new RedissonMap<K, V>(null, codec, commandExecutor, name, null);
}
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonMap<K, V>(null, codec, commandExecutor, name);
instance = new RedissonMap<K, V>(null, codec, commandExecutor, name, null);
}
@Override

@ -59,12 +59,12 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name);
instance = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
}
public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name);
instance = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
}
@Override

@ -45,12 +45,12 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonSet<V>(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
instance = new RedissonSet<V>(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, null);
}
public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonSet<V>(codec, commandExecutor, name);
instance = new RedissonSet<V>(codec, commandExecutor, name, null);
}
@Override

@ -0,0 +1,229 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.api.RExecutorService;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.api.mapreduce.RCollectionMapper;
import org.redisson.api.mapreduce.RCollector;
import org.redisson.api.mapreduce.RReducer;
@RunWith(Parameterized.class)
public class RedissonCollectionMapReduceTest extends BaseTest {
public static class WordMapper implements RCollectionMapper<String, String, Integer> {
@Override
public void map(String value, RCollector<String, Integer> collector) {
String[] words = value.split("[^a-zA-Z]");
for (String word : words) {
collector.emit(word, 1);
}
}
}
public static class WordReducer implements RReducer<String, Integer> {
@Override
public Integer reduce(String reducedKey, Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
Integer i = (Integer) iter.next();
sum += i;
}
return sum;
}
}
public static class WordCollator implements RCollator<String, Integer, Integer> {
@Override
public Integer collate(Map<String, Integer> resultMap) {
int result = 0;
for (Integer count : resultMap.values()) {
result += count;
}
return result;
}
}
@Parameterized.Parameters(name = "{index} - {0}")
public static Iterable<Object[]> mapClasses() {
return Arrays.asList(new Object[][]{
{RList.class}, {RQueue.class}
});
}
@Parameterized.Parameter(0)
public Class<?> mapClass;
@Before
public void beforeTest() {
redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME).registerWorkers(3);
}
@Test
public void test() {
RList<String> list = getCollection();
list.add("Alice was beginning to get very tired");
list.add("of sitting by her sister on the bank and");
list.add("of having nothing to do once or twice she");
list.add("had peeped into the book her sister was reading");
list.add("but it had no pictures or conversations in it");
list.add("and what is the use of a book");
list.add("thought Alice without pictures or conversation");
Map<String, Integer> result = new HashMap<>();
result.put("to", 2);
result.put("Alice", 2);
result.put("get", 1);
result.put("beginning", 1);
result.put("sitting", 1);
result.put("do", 1);
result.put("by", 1);
result.put("or", 3);
result.put("into", 1);
result.put("sister", 2);
result.put("on", 1);
result.put("a", 1);
result.put("without", 1);
result.put("and", 2);
result.put("once", 1);
result.put("twice", 1);
result.put("she", 1);
result.put("had", 2);
result.put("reading", 1);
result.put("but", 1);
result.put("it", 2);
result.put("no", 1);
result.put("in", 1);
result.put("what", 1);
result.put("use", 1);
result.put("thought", 1);
result.put("conversation", 1);
result.put("was", 2);
result.put("very", 1);
result.put("tired", 1);
result.put("of", 3);
result.put("her", 2);
result.put("the", 3);
result.put("bank", 1);
result.put("having", 1);
result.put("nothing", 1);
result.put("peeped", 1);
result.put("book", 2);
result.put("pictures", 2);
result.put("conversations", 1);
result.put("is", 1);
RCollectionMapReduce<String, String, Integer> mapReduce = list.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());
assertThat(mapReduce.execute()).isEqualTo(result);
Integer count = mapReduce.execute(new WordCollator());
assertThat(count).isEqualTo(57);
mapReduce.execute("resultMap");
RMap<Object, Object> resultMap = redisson.getMap("resultMap");
assertThat(resultMap).isEqualTo(result);
resultMap.delete();
}
public static class WordMapperInject implements RCollectionMapper<String, String, Integer> {
@RInject
private RedissonClient redisson;
@Override
public void map(String value, RCollector<String, Integer> collector) {
redisson.getAtomicLong("test").incrementAndGet();
String[] words = value.split("[^a-zA-Z]");
for (String word : words) {
collector.emit(word, 1);
}
}
}
public static class WordReducerInject implements RReducer<String, Integer> {
@RInject
private RedissonClient redisson;
@Override
public Integer reduce(String reducedKey, Iterator<Integer> iter) {
redisson.getAtomicLong("test").incrementAndGet();
int sum = 0;
while (iter.hasNext()) {
Integer i = (Integer) iter.next();
sum += i;
}
return sum;
}
}
public static class WordCollatorInject implements RCollator<String, Integer, Integer> {
@RInject
private RedissonClient redisson;
@Override
public Integer collate(Map<String, Integer> resultMap) {
redisson.getAtomicLong("test").incrementAndGet();
int result = 0;
for (Integer count : resultMap.values()) {
result += count;
}
return result;
}
}
@Test
public void testInjector() {
RList<String> list = getCollection();
list.add("Alice was beginning to get very tired");
RCollectionMapReduce<String, String, Integer> mapReduce = list.<String, Integer>mapReduce().mapper(new WordMapperInject()).reducer(new WordReducerInject());
mapReduce.execute();
assertThat(redisson.getAtomicLong("test").get()).isEqualTo(8);
mapReduce.execute(new WordCollatorInject());
assertThat(redisson.getAtomicLong("test").get()).isEqualTo(16 + 1);
}
private RList<String> getCollection() {
RList<String> list = null;
if (RList.class.isAssignableFrom(mapClass)) {
list = redisson.getList("list");
} else if (RQueue.class.isAssignableFrom(mapClass)) {
list = (RList<String>) redisson.<String>getQueue("queue");
}
return list;
}
}

@ -84,7 +84,7 @@ public class RedissonDequeTest extends BaseTest {
queue2.addFirst(5);
queue2.addFirst(4);
queue1.pollLastAndOfferFirstTo(queue2);
queue1.pollLastAndOfferFirstTo(queue2.getName());
assertThat(queue2).containsExactly(3, 4, 5, 6);
}

@ -0,0 +1,228 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.api.RExecutorService;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RCollector;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.api.mapreduce.RMapper;
import org.redisson.api.mapreduce.RReducer;
@RunWith(Parameterized.class)
public class RedissonMapReduceTest extends BaseTest {
public static class WordMapper implements RMapper<String, String, String, Integer> {
@Override
public void map(String key, String value, RCollector<String, Integer> collector) {
String[] words = value.split("[^a-zA-Z]");
for (String word : words) {
collector.emit(word, 1);
}
}
}
public static class WordReducer implements RReducer<String, Integer> {
@Override
public Integer reduce(String reducedKey, Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
Integer i = (Integer) iter.next();
sum += i;
}
return sum;
}
}
public static class WordCollator implements RCollator<String, Integer, Integer> {
@Override
public Integer collate(Map<String, Integer> resultMap) {
int result = 0;
for (Integer count : resultMap.values()) {
result += count;
}
return result;
}
}
@Parameterized.Parameters(name = "{index} - {0}")
public static Iterable<Object[]> mapClasses() {
return Arrays.asList(new Object[][]{
{RMap.class}, {RMapCache.class}
});
}
@Parameterized.Parameter(0)
public Class<?> mapClass;
@Before
public void beforeTest() {
redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME).registerWorkers(3);
}
@Test
public void test() {
RMap<String, String> map = null;
if (RMapCache.class.isAssignableFrom(mapClass)) {
map = redisson.getMapCache("map");
} else {
map = redisson.getMap("map");
}
map.put("1", "Alice was beginning to get very tired");
map.put("2", "of sitting by her sister on the bank and");
map.put("3", "of having nothing to do once or twice she");
map.put("4", "had peeped into the book her sister was reading");
map.put("5", "but it had no pictures or conversations in it");
map.put("6", "and what is the use of a book");
map.put("7", "thought Alice without pictures or conversation");
Map<String, Integer> result = new HashMap<>();
result.put("to", 2);
result.put("Alice", 2);
result.put("get", 1);
result.put("beginning", 1);
result.put("sitting", 1);
result.put("do", 1);
result.put("by", 1);
result.put("or", 3);
result.put("into", 1);
result.put("sister", 2);
result.put("on", 1);
result.put("a", 1);
result.put("without", 1);
result.put("and", 2);
result.put("once", 1);
result.put("twice", 1);
result.put("she", 1);
result.put("had", 2);
result.put("reading", 1);
result.put("but", 1);
result.put("it", 2);
result.put("no", 1);
result.put("in", 1);
result.put("what", 1);
result.put("use", 1);
result.put("thought", 1);
result.put("conversation", 1);
result.put("was", 2);
result.put("very", 1);
result.put("tired", 1);
result.put("of", 3);
result.put("her", 2);
result.put("the", 3);
result.put("bank", 1);
result.put("having", 1);
result.put("nothing", 1);
result.put("peeped", 1);
result.put("book", 2);
result.put("pictures", 2);
result.put("conversations", 1);
result.put("is", 1);
RMapReduce<String, String, String, Integer> mapReduce = map.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());
assertThat(mapReduce.execute()).isEqualTo(result);
Integer count = mapReduce.execute(new WordCollator());
assertThat(count).isEqualTo(57);
mapReduce.execute("resultMap");
RMap<Object, Object> resultMap = redisson.getMap("resultMap");
assertThat(resultMap).isEqualTo(result);
resultMap.delete();
}
public static class WordMapperInject implements RMapper<String, String, String, Integer> {
@RInject
private RedissonClient redisson;
@Override
public void map(String key, String value, RCollector<String, Integer> collector) {
redisson.getAtomicLong("test").incrementAndGet();
String[] words = value.split("[^a-zA-Z]");
for (String word : words) {
collector.emit(word, 1);
}
}
}
public static class WordReducerInject implements RReducer<String, Integer> {
@RInject
private RedissonClient redisson;
@Override
public Integer reduce(String reducedKey, Iterator<Integer> iter) {
redisson.getAtomicLong("test").incrementAndGet();
int sum = 0;
while (iter.hasNext()) {
Integer i = (Integer) iter.next();
sum += i;
}
return sum;
}
}
public static class WordCollatorInject implements RCollator<String, Integer, Integer> {
@RInject
private RedissonClient redisson;
@Override
public Integer collate(Map<String, Integer> resultMap) {
redisson.getAtomicLong("test").incrementAndGet();
int result = 0;
for (Integer count : resultMap.values()) {
result += count;
}
return result;
}
}
@Test
public void testInjector() {
RMap<String, String> map = null;
if (RMapCache.class.isAssignableFrom(mapClass)) {
map = redisson.getMapCache("map");
} else {
map = redisson.getMap("map");
}
map.put("1", "Alice was beginning to get very tired");
RMapReduce<String, String, String, Integer> mapReduce = map.<String, Integer>mapReduce().mapper(new WordMapperInject()).reducer(new WordReducerInject());
mapReduce.execute();
assertThat(redisson.getAtomicLong("test").get()).isEqualTo(8);
mapReduce.execute(new WordCollatorInject());
assertThat(redisson.getAtomicLong("test").get()).isEqualTo(16 + 1);
}
}
Loading…
Cancel
Save