Introduced timeout parameter for MapReduced. #312

pull/836/head
Nikita 8 years ago
parent 111bb4dd59
commit bdf473a8b8

@ -39,7 +39,6 @@ public interface RExecutorServiceAsync {
* @param task - task to execute
* @return Future object
*/
@Deprecated
<T> RFuture<T> submitAsync(Callable<T> task);
/**
@ -48,7 +47,6 @@ public interface RExecutorServiceAsync {
* @param task - task to execute
* @return Future object
*/
@Deprecated
RFuture<?> submitAsync(Runnable task);
}

@ -15,6 +15,8 @@
*/
package org.redisson.api.mapreduce;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RList;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RSet;
@ -88,6 +90,15 @@ import org.redisson.api.RSortedSet;
*/
public interface RCollectionMapReduce<VIn, KOut, VOut> extends RMapReduceExecutor<VIn, KOut, VOut> {
/**
* Defines timeout for MapReduce process
*
* @param timeout
* @param unit
* @return self instance
*/
RCollectionMapReduce<VIn, KOut, VOut> timeout(long timeout, TimeUnit unit);
/**
* Setup Mapper object
*

@ -15,6 +15,8 @@
*/
package org.redisson.api.mapreduce;
import java.util.concurrent.TimeUnit;
/**
*
* MapReduce allows to process large amount of data stored in Redis map
@ -82,6 +84,15 @@ package org.redisson.api.mapreduce;
*/
public interface RMapReduce<KIn, VIn, KOut, VOut> extends RMapReduceExecutor<VIn, KOut, VOut> {
/**
* Defines timeout for MapReduce process
*
* @param timeout
* @param unit
* @return self instance
*/
RMapReduce<KIn, VIn, KOut, VOut> timeout(long timeout, TimeUnit unit);
/**
* Setup Mapper object
*

@ -40,6 +40,7 @@ public abstract class BaseMapperTask<KOut, VOut> implements Runnable, Serializab
protected int workersAmount;
protected String collectorMapName;
protected long timeout;
public BaseMapperTask() {
}
@ -51,18 +52,14 @@ public abstract class BaseMapperTask<KOut, VOut> implements Runnable, Serializab
this.objectCodecClass = objectCodecClass;
}
public int getWorkersAmount() {
return workersAmount;
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public void setWorkersAmount(int workersAmount) {
this.workersAmount = workersAmount;
}
public String getCollectorMapName() {
return collectorMapName;
}
public void setCollectorMapName(String collatorMapName) {
this.collectorMapName = collatorMapName;
}

@ -76,7 +76,7 @@ public class CollectionMapperTask<VIn, KOut, VOut> extends BaseMapperTask<KOut,
throw new IllegalStateException("Unable to work with " + objectClass);
}
RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount);
RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount, timeout);
for (VIn value : collection) {
if (Thread.currentThread().isInterrupted()) {

@ -16,6 +16,8 @@
package org.redisson.mapreduce;
import java.io.IOException;
import java.util.BitSet;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RListMultimap;
import org.redisson.api.RedissonClient;
@ -37,13 +39,17 @@ public class Collector<K, V> implements RCollector<K, V> {
private String name;
private int parts;
private Codec codec;
private long timeout;
private BitSet expirationsBitSet = new BitSet();
public Collector(Codec codec, RedissonClient client, String name, int parts) {
public Collector(Codec codec, RedissonClient client, String name, int parts, long timeout) {
super();
this.client = client;
this.name = name;
this.parts = parts;
this.codec = codec;
this.timeout = timeout;
expirationsBitSet = new BitSet(parts);
}
@Override
@ -51,10 +57,15 @@ public class Collector<K, V> implements RCollector<K, V> {
try {
byte[] encodedKey = codec.getValueEncoder().encode(key);
long hash = LongHashFunction.xx_r39().hashBytes(encodedKey);
String partName = name + ":" + Math.abs(hash % parts);
int part = (int) Math.abs(hash % parts);
String partName = name + ":" + part;
RListMultimap<K, V> multimap = client.getListMultimap(partName, codec);
multimap.put(key, value);
if (timeout > 0 && !expirationsBitSet.get(part)) {
multimap.expire(timeout, TimeUnit.MILLISECONDS);
expirationsBitSet.set(part);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}

@ -21,6 +21,9 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
@ -74,6 +77,8 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
protected Class<?> objectClass;
private Class<?> objectCodecClass;
private String resultMapName;
private long timeout;
private long startTime;
protected Codec codec;
@ -82,7 +87,7 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
public CoordinatorTask(BaseMapperTask<KOut, VOut> mapperTask, RReducer<KOut, VOut> reducer,
String mapName, String resultMapName, Class<?> mapCodecClass, Class<?> objectClass,
RCollator<KOut, VOut, Object> collator) {
RCollator<KOut, VOut, Object> collator, long timeout, long startTime) {
super();
this.mapperTask = mapperTask;
this.reducer = reducer;
@ -91,49 +96,81 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
this.objectClass = objectClass;
this.resultMapName = resultMapName;
this.collator = collator;
this.timeout = timeout;
this.startTime = startTime;
}
@Override
public Object call() throws Exception {
long timeSpent = System.currentTimeMillis() - startTime;
if (isTimeoutExpired(timeSpent)) {
throw new MapReduceTimeoutException();
}
this.codec = (Codec) objectCodecClass.getConstructor().newInstance();
RScheduledExecutorService executor = redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME);
int workersAmount = executor.countActiveWorkers();
UUID id = UUID.randomUUID();
String collectorMapName = objectName + ":collector:" + id;
mapperTask.setCollectorMapName(collectorMapName);
mapperTask.setWorkersAmount(workersAmount);
executor.submit(mapperTask).get();
if (Thread.currentThread().isInterrupted()) {
return null;
timeSpent = System.currentTimeMillis() - startTime;
if (isTimeoutExpired(timeSpent)) {
throw new MapReduceTimeoutException();
}
if (timeout > 0) {
mapperTask.setTimeout(timeout - timeSpent);
}
RFuture<?> mapperFuture = executor.submit(mapperTask);
if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) {
mapperFuture.cancel(true);
throw new MapReduceTimeoutException();
}
if (timeout == 0) {
try {
mapperFuture.await();
} catch (InterruptedException e) {
return null;
}
}
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
final CountDownLatch latch = new CountDownLatch(workersAmount);
for (int i = 0; i < workersAmount; i++) {
String name = collectorMapName + ":" + i;
Runnable runnable = new ReducerTask<KOut, VOut>(name, reducer, objectCodecClass, resultMapName);
RFuture<?> future = executor.submit(runnable);
Runnable runnable = new ReducerTask<KOut, VOut>(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent);
RFuture<?> future = executor.submitAsync(runnable);
future.addListener(new LatchListener(latch));
futures.add(future);
}
if (Thread.currentThread().isInterrupted()) {
for (RFuture<?> future : futures) {
future.cancel(true);
}
cancelReduce(futures);
return null;
}
timeSpent = System.currentTimeMillis() - startTime;
if (isTimeoutExpired(timeSpent)) {
cancelReduce(futures);
throw new MapReduceTimeoutException();
}
try {
latch.await();
} catch (InterruptedException e) {
for (RFuture<?> future : futures) {
future.cancel(true);
if (timeout > 0 && !latch.await(timeout - timeSpent, TimeUnit.MILLISECONDS)) {
cancelReduce(futures);
throw new MapReduceTimeoutException();
}
if (timeout == 0) {
try {
latch.await();
} catch (InterruptedException e) {
return null;
}
}
} catch (InterruptedException e) {
cancelReduce(futures);
return null;
}
for (RFuture<?> rFuture : futures) {
@ -142,12 +179,46 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
}
}
return executeCollator();
}
private Object executeCollator() throws ExecutionException, Exception {
if (collator == null) {
if (timeout > 0) {
redisson.getMap(resultMapName).clearExpire();
}
return null;
}
Callable<Object> collatorTask = new CollatorTask<KOut, VOut, Object>(redisson, collator, resultMapName, objectCodecClass);
return collatorTask.call();
long timeSpent = System.currentTimeMillis() - startTime;
if (isTimeoutExpired(timeSpent)) {
throw new MapReduceTimeoutException();
}
if (timeout > 0) {
java.util.concurrent.Future<?> collatorFuture = redisson.getConfig().getExecutor().submit(collatorTask);
try {
return collatorFuture.get(timeout - timeSpent, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
} catch (TimeoutException e) {
collatorFuture.cancel(true);
throw new MapReduceTimeoutException();
}
} else {
return collatorTask.call();
}
}
private boolean isTimeoutExpired(long timeSpent) {
return timeSpent > timeout && timeout > 0;
}
private void cancelReduce(List<RFuture<?>> futures) {
for (RFuture<?> future : futures) {
future.cancel(true);
}
}
}

@ -60,6 +60,7 @@ abstract class MapReduceExecutor<M, VIn, KOut, VOut> implements RMapReduceExecut
private ConnectionManager connectionManager;
RReducer<KOut, VOut> reducer;
M mapper;
long timeout;
public MapReduceExecutor(RObject object, RedissonClient redisson, ConnectionManager connectionManager) {
this.objectName = object.getName();

@ -0,0 +1,14 @@
package org.redisson.mapreduce;
import org.redisson.client.RedisException;
/**
*
* @author Nikita Koksharov
*
*/
public class MapReduceTimeoutException extends RedisException {
private static final long serialVersionUID = -198991995396319360L;
}

@ -65,7 +65,7 @@ public class MapperTask<KIn, VIn, KOut, VOut> extends BaseMapperTask<KOut, VOut>
map = redisson.getMap(objectName, codec);
}
RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount);
RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount, timeout);
for (Entry<KIn, VIn> entry : map.entrySet()) {
if (Thread.currentThread().isInterrupted()) {

@ -16,6 +16,7 @@
package org.redisson.mapreduce;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
@ -40,6 +41,12 @@ public class RedissonCollectionMapReduce<VIn, KOut, VOut> extends MapReduceExecu
super(object, redisson, connectionManager);
}
@Override
public RCollectionMapReduce<VIn, KOut, VOut> timeout(long timeout, TimeUnit unit) {
this.timeout = unit.toMillis(timeout);
return this;
}
@Override
public RCollectionMapReduce<VIn, KOut, VOut> mapper(RCollectionMapper<VIn, KOut, VOut> mapper) {
check(mapper);
@ -57,7 +64,7 @@ public class RedissonCollectionMapReduce<VIn, KOut, VOut> extends MapReduceExecu
@Override
protected Callable<Object> createTask(String resultMapName, RCollator<KOut, VOut, Object> collator) {
CollectionMapperTask<VIn, KOut, VOut> mapperTask = new CollectionMapperTask<VIn, KOut, VOut>(mapper, objectClass, objectName, objectCodec.getClass());
return new CoordinatorTask<KOut, VOut>(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator);
return new CoordinatorTask<KOut, VOut>(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis());
}
}

@ -16,10 +16,12 @@
package org.redisson.mapreduce;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.api.mapreduce.RMapper;
import org.redisson.api.mapreduce.RReducer;
@ -41,6 +43,12 @@ public class RedissonMapReduce<KIn, VIn, KOut, VOut> extends MapReduceExecutor<R
super(object, redisson, connectionManager);
}
@Override
public RMapReduce<KIn, VIn, KOut, VOut> timeout(long timeout, TimeUnit unit) {
this.timeout = unit.toMillis(timeout);
return this;
}
@Override
public RMapReduce<KIn, VIn, KOut, VOut> mapper(RMapper<KIn, VIn, KOut, VOut> mapper) {
check(mapper);
@ -58,7 +66,7 @@ public class RedissonMapReduce<KIn, VIn, KOut, VOut> extends MapReduceExecutor<R
@Override
protected Callable<Object> createTask(String resultMapName, RCollator<KOut, VOut, Object> collator) {
MapperTask<KIn, VIn, KOut, VOut> mapperTask = new MapperTask<KIn, VIn, KOut, VOut>(mapper, objectClass, objectName, objectCodec.getClass());
return new CoordinatorTask<KOut, VOut>(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator);
return new CoordinatorTask<KOut, VOut>(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis());
}
}

@ -17,6 +17,7 @@ package org.redisson.mapreduce;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RListMultimap;
import org.redisson.api.RMap;
@ -45,15 +46,17 @@ public class ReducerTask<KOut, VOut> implements Runnable, Serializable {
private RReducer<KOut, VOut> reducer;
private Class<?> codecClass;
private Codec codec;
private long timeout;
public ReducerTask() {
}
public ReducerTask(String name, RReducer<KOut, VOut> reducer, Class<?> codecClass, String resultMapName) {
public ReducerTask(String name, RReducer<KOut, VOut> reducer, Class<?> codecClass, String resultMapName, long timeout) {
this.name = name;
this.reducer = reducer;
this.resultMapName = resultMapName;
this.codecClass = codecClass;
this.timeout = timeout;
}
@Override
@ -76,6 +79,9 @@ public class ReducerTask<KOut, VOut> implements Runnable, Serializable {
VOut out = reducer.reduce(key, values.iterator());
map.put(key, out);
}
if (timeout > 0) {
map.expire(timeout, TimeUnit.MILLISECONDS);
}
multimap.delete();
}

@ -7,7 +7,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
@ -18,7 +17,6 @@ import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
import org.redisson.api.mapreduce.RCollator;
@ -26,9 +24,7 @@ import org.redisson.api.mapreduce.RCollector;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.api.mapreduce.RMapper;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.executor.ScheduledRunnableTask;
import net.bytebuddy.utility.RandomString;
import org.redisson.mapreduce.MapReduceTimeoutException;
@RunWith(Parameterized.class)
public class RedissonMapReduceTest extends BaseTest {
@ -99,9 +95,22 @@ public class RedissonMapReduceTest extends BaseTest {
RFuture<Map<String, Integer>> future = mapReduce.executeAsync();
Thread.sleep(100);
future.cancel(true);
Thread.sleep(3000);
}
@Test(expected = MapReduceTimeoutException.class)
public void testTimeout() {
RMap<String, String> map = getMap();
for (int i = 0; i < 100000; i++) {
map.put("" + i, "ab cd fjks");
}
RMapReduce<String, String, String, Integer> mapReduce = map.<String, Integer>mapReduce()
.mapper(new WordMapper())
.reducer(new WordReducer())
.timeout(1, TimeUnit.SECONDS);
mapReduce.execute();
}
@Test
public void test() {

Loading…
Cancel
Save