diff --git a/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java b/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java index 4c7c37342..f439b4ee7 100644 --- a/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java +++ b/redisson/src/main/java/org/redisson/api/RExecutorServiceAsync.java @@ -39,7 +39,6 @@ public interface RExecutorServiceAsync { * @param task - task to execute * @return Future object */ - @Deprecated RFuture submitAsync(Callable task); /** @@ -48,7 +47,6 @@ public interface RExecutorServiceAsync { * @param task - task to execute * @return Future object */ - @Deprecated RFuture submitAsync(Runnable task); } diff --git a/redisson/src/main/java/org/redisson/api/mapreduce/RCollectionMapReduce.java b/redisson/src/main/java/org/redisson/api/mapreduce/RCollectionMapReduce.java index 0a7efe1c4..dd212490f 100644 --- a/redisson/src/main/java/org/redisson/api/mapreduce/RCollectionMapReduce.java +++ b/redisson/src/main/java/org/redisson/api/mapreduce/RCollectionMapReduce.java @@ -15,6 +15,8 @@ */ package org.redisson.api.mapreduce; +import java.util.concurrent.TimeUnit; + import org.redisson.api.RList; import org.redisson.api.RScoredSortedSet; import org.redisson.api.RSet; @@ -88,6 +90,15 @@ import org.redisson.api.RSortedSet; */ public interface RCollectionMapReduce extends RMapReduceExecutor { + /** + * Defines timeout for MapReduce process + * + * @param timeout + * @param unit + * @return self instance + */ + RCollectionMapReduce timeout(long timeout, TimeUnit unit); + /** * Setup Mapper object * diff --git a/redisson/src/main/java/org/redisson/api/mapreduce/RMapReduce.java b/redisson/src/main/java/org/redisson/api/mapreduce/RMapReduce.java index 2509577db..206e91d35 100644 --- a/redisson/src/main/java/org/redisson/api/mapreduce/RMapReduce.java +++ b/redisson/src/main/java/org/redisson/api/mapreduce/RMapReduce.java @@ -15,6 +15,8 @@ */ package org.redisson.api.mapreduce; +import java.util.concurrent.TimeUnit; + /** * * MapReduce allows to process large amount of data stored in Redis map @@ -82,6 +84,15 @@ package org.redisson.api.mapreduce; */ public interface RMapReduce extends RMapReduceExecutor { + /** + * Defines timeout for MapReduce process + * + * @param timeout + * @param unit + * @return self instance + */ + RMapReduce timeout(long timeout, TimeUnit unit); + /** * Setup Mapper object * diff --git a/redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java b/redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java index 7520dd2a1..7f04f0020 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/BaseMapperTask.java @@ -40,6 +40,7 @@ public abstract class BaseMapperTask implements Runnable, Serializab protected int workersAmount; protected String collectorMapName; + protected long timeout; public BaseMapperTask() { } @@ -51,18 +52,14 @@ public abstract class BaseMapperTask implements Runnable, Serializab this.objectCodecClass = objectCodecClass; } - public int getWorkersAmount() { - return workersAmount; + public void setTimeout(long timeout) { + this.timeout = timeout; } - + public void setWorkersAmount(int workersAmount) { this.workersAmount = workersAmount; } - public String getCollectorMapName() { - return collectorMapName; - } - public void setCollectorMapName(String collatorMapName) { this.collectorMapName = collatorMapName; } diff --git a/redisson/src/main/java/org/redisson/mapreduce/CollectionMapperTask.java b/redisson/src/main/java/org/redisson/mapreduce/CollectionMapperTask.java index 261c83839..6abcf4cdc 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/CollectionMapperTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/CollectionMapperTask.java @@ -76,7 +76,7 @@ public class CollectionMapperTask extends BaseMapperTask collector = new Collector(codec, redisson, collectorMapName, workersAmount); + RCollector collector = new Collector(codec, redisson, collectorMapName, workersAmount, timeout); for (VIn value : collection) { if (Thread.currentThread().isInterrupted()) { diff --git a/redisson/src/main/java/org/redisson/mapreduce/Collector.java b/redisson/src/main/java/org/redisson/mapreduce/Collector.java index 907c82b68..37f6f5836 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/Collector.java +++ b/redisson/src/main/java/org/redisson/mapreduce/Collector.java @@ -16,6 +16,8 @@ package org.redisson.mapreduce; import java.io.IOException; +import java.util.BitSet; +import java.util.concurrent.TimeUnit; import org.redisson.api.RListMultimap; import org.redisson.api.RedissonClient; @@ -37,13 +39,17 @@ public class Collector implements RCollector { private String name; private int parts; private Codec codec; + private long timeout; + private BitSet expirationsBitSet = new BitSet(); - public Collector(Codec codec, RedissonClient client, String name, int parts) { + public Collector(Codec codec, RedissonClient client, String name, int parts, long timeout) { super(); this.client = client; this.name = name; this.parts = parts; this.codec = codec; + this.timeout = timeout; + expirationsBitSet = new BitSet(parts); } @Override @@ -51,10 +57,15 @@ public class Collector implements RCollector { try { byte[] encodedKey = codec.getValueEncoder().encode(key); long hash = LongHashFunction.xx_r39().hashBytes(encodedKey); - String partName = name + ":" + Math.abs(hash % parts); + int part = (int) Math.abs(hash % parts); + String partName = name + ":" + part; RListMultimap multimap = client.getListMultimap(partName, codec); multimap.put(key, value); + if (timeout > 0 && !expirationsBitSet.get(part)) { + multimap.expire(timeout, TimeUnit.MILLISECONDS); + expirationsBitSet.set(part); + } } catch (IOException e) { throw new IllegalArgumentException(e); } diff --git a/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java b/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java index 245cae378..e41358bf0 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/CoordinatorTask.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.redisson.api.RExecutorService; import org.redisson.api.RFuture; @@ -74,6 +77,8 @@ public class CoordinatorTask implements Callable, Serializab protected Class objectClass; private Class objectCodecClass; private String resultMapName; + private long timeout; + private long startTime; protected Codec codec; @@ -82,7 +87,7 @@ public class CoordinatorTask implements Callable, Serializab public CoordinatorTask(BaseMapperTask mapperTask, RReducer reducer, String mapName, String resultMapName, Class mapCodecClass, Class objectClass, - RCollator collator) { + RCollator collator, long timeout, long startTime) { super(); this.mapperTask = mapperTask; this.reducer = reducer; @@ -91,49 +96,81 @@ public class CoordinatorTask implements Callable, Serializab this.objectClass = objectClass; this.resultMapName = resultMapName; this.collator = collator; + this.timeout = timeout; + this.startTime = startTime; } @Override public Object call() throws Exception { + long timeSpent = System.currentTimeMillis() - startTime; + if (isTimeoutExpired(timeSpent)) { + throw new MapReduceTimeoutException(); + } + this.codec = (Codec) objectCodecClass.getConstructor().newInstance(); RScheduledExecutorService executor = redisson.getExecutorService(RExecutorService.MAPREDUCE_NAME); int workersAmount = executor.countActiveWorkers(); - + UUID id = UUID.randomUUID(); String collectorMapName = objectName + ":collector:" + id; mapperTask.setCollectorMapName(collectorMapName); mapperTask.setWorkersAmount(workersAmount); - executor.submit(mapperTask).get(); - - if (Thread.currentThread().isInterrupted()) { - return null; + timeSpent = System.currentTimeMillis() - startTime; + if (isTimeoutExpired(timeSpent)) { + throw new MapReduceTimeoutException(); + } + if (timeout > 0) { + mapperTask.setTimeout(timeout - timeSpent); + } + RFuture mapperFuture = executor.submit(mapperTask); + if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) { + mapperFuture.cancel(true); + throw new MapReduceTimeoutException(); + } + if (timeout == 0) { + try { + mapperFuture.await(); + } catch (InterruptedException e) { + return null; + } } List> futures = new ArrayList>(); final CountDownLatch latch = new CountDownLatch(workersAmount); for (int i = 0; i < workersAmount; i++) { String name = collectorMapName + ":" + i; - Runnable runnable = new ReducerTask(name, reducer, objectCodecClass, resultMapName); - RFuture future = executor.submit(runnable); + Runnable runnable = new ReducerTask(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent); + RFuture future = executor.submitAsync(runnable); future.addListener(new LatchListener(latch)); futures.add(future); } if (Thread.currentThread().isInterrupted()) { - for (RFuture future : futures) { - future.cancel(true); - } + cancelReduce(futures); return null; } + timeSpent = System.currentTimeMillis() - startTime; + if (isTimeoutExpired(timeSpent)) { + cancelReduce(futures); + throw new MapReduceTimeoutException(); + } try { - latch.await(); - } catch (InterruptedException e) { - for (RFuture future : futures) { - future.cancel(true); + if (timeout > 0 && !latch.await(timeout - timeSpent, TimeUnit.MILLISECONDS)) { + cancelReduce(futures); + throw new MapReduceTimeoutException(); } + if (timeout == 0) { + try { + latch.await(); + } catch (InterruptedException e) { + return null; + } + } + } catch (InterruptedException e) { + cancelReduce(futures); return null; } for (RFuture rFuture : futures) { @@ -142,12 +179,46 @@ public class CoordinatorTask implements Callable, Serializab } } + return executeCollator(); + } + + private Object executeCollator() throws ExecutionException, Exception { if (collator == null) { + if (timeout > 0) { + redisson.getMap(resultMapName).clearExpire(); + } return null; } Callable collatorTask = new CollatorTask(redisson, collator, resultMapName, objectCodecClass); - return collatorTask.call(); + long timeSpent = System.currentTimeMillis() - startTime; + if (isTimeoutExpired(timeSpent)) { + throw new MapReduceTimeoutException(); + } + + if (timeout > 0) { + java.util.concurrent.Future collatorFuture = redisson.getConfig().getExecutor().submit(collatorTask); + try { + return collatorFuture.get(timeout - timeSpent, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return null; + } catch (TimeoutException e) { + collatorFuture.cancel(true); + throw new MapReduceTimeoutException(); + } + } else { + return collatorTask.call(); + } + } + + private boolean isTimeoutExpired(long timeSpent) { + return timeSpent > timeout && timeout > 0; + } + + private void cancelReduce(List> futures) { + for (RFuture future : futures) { + future.cancel(true); + } } } diff --git a/redisson/src/main/java/org/redisson/mapreduce/MapReduceExecutor.java b/redisson/src/main/java/org/redisson/mapreduce/MapReduceExecutor.java index 731c2d3e2..a5d1f3aeb 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/MapReduceExecutor.java +++ b/redisson/src/main/java/org/redisson/mapreduce/MapReduceExecutor.java @@ -60,6 +60,7 @@ abstract class MapReduceExecutor implements RMapReduceExecut private ConnectionManager connectionManager; RReducer reducer; M mapper; + long timeout; public MapReduceExecutor(RObject object, RedissonClient redisson, ConnectionManager connectionManager) { this.objectName = object.getName(); diff --git a/redisson/src/main/java/org/redisson/mapreduce/MapReduceTimeoutException.java b/redisson/src/main/java/org/redisson/mapreduce/MapReduceTimeoutException.java new file mode 100644 index 000000000..5b61c9d10 --- /dev/null +++ b/redisson/src/main/java/org/redisson/mapreduce/MapReduceTimeoutException.java @@ -0,0 +1,14 @@ +package org.redisson.mapreduce; + +import org.redisson.client.RedisException; + +/** + * + * @author Nikita Koksharov + * + */ +public class MapReduceTimeoutException extends RedisException { + + private static final long serialVersionUID = -198991995396319360L; + +} diff --git a/redisson/src/main/java/org/redisson/mapreduce/MapperTask.java b/redisson/src/main/java/org/redisson/mapreduce/MapperTask.java index 04829d769..1937e8f92 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/MapperTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/MapperTask.java @@ -65,7 +65,7 @@ public class MapperTask extends BaseMapperTask map = redisson.getMap(objectName, codec); } - RCollector collector = new Collector(codec, redisson, collectorMapName, workersAmount); + RCollector collector = new Collector(codec, redisson, collectorMapName, workersAmount, timeout); for (Entry entry : map.entrySet()) { if (Thread.currentThread().isInterrupted()) { diff --git a/redisson/src/main/java/org/redisson/mapreduce/RedissonCollectionMapReduce.java b/redisson/src/main/java/org/redisson/mapreduce/RedissonCollectionMapReduce.java index 999aa0963..a443fc1ea 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/RedissonCollectionMapReduce.java +++ b/redisson/src/main/java/org/redisson/mapreduce/RedissonCollectionMapReduce.java @@ -16,6 +16,7 @@ package org.redisson.mapreduce; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.redisson.api.RObject; import org.redisson.api.RedissonClient; @@ -40,6 +41,12 @@ public class RedissonCollectionMapReduce extends MapReduceExecu super(object, redisson, connectionManager); } + @Override + public RCollectionMapReduce timeout(long timeout, TimeUnit unit) { + this.timeout = unit.toMillis(timeout); + return this; + } + @Override public RCollectionMapReduce mapper(RCollectionMapper mapper) { check(mapper); @@ -57,7 +64,7 @@ public class RedissonCollectionMapReduce extends MapReduceExecu @Override protected Callable createTask(String resultMapName, RCollator collator) { CollectionMapperTask mapperTask = new CollectionMapperTask(mapper, objectClass, objectName, objectCodec.getClass()); - return new CoordinatorTask(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator); + return new CoordinatorTask(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis()); } } diff --git a/redisson/src/main/java/org/redisson/mapreduce/RedissonMapReduce.java b/redisson/src/main/java/org/redisson/mapreduce/RedissonMapReduce.java index 0b24ff1f7..59cf78a07 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/RedissonMapReduce.java +++ b/redisson/src/main/java/org/redisson/mapreduce/RedissonMapReduce.java @@ -16,10 +16,12 @@ package org.redisson.mapreduce; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.redisson.api.RObject; import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RCollator; +import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.api.mapreduce.RMapper; import org.redisson.api.mapreduce.RReducer; @@ -41,6 +43,12 @@ public class RedissonMapReduce extends MapReduceExecutor timeout(long timeout, TimeUnit unit) { + this.timeout = unit.toMillis(timeout); + return this; + } + @Override public RMapReduce mapper(RMapper mapper) { check(mapper); @@ -58,7 +66,7 @@ public class RedissonMapReduce extends MapReduceExecutor createTask(String resultMapName, RCollator collator) { MapperTask mapperTask = new MapperTask(mapper, objectClass, objectName, objectCodec.getClass()); - return new CoordinatorTask(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator); + return new CoordinatorTask(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis()); } } diff --git a/redisson/src/main/java/org/redisson/mapreduce/ReducerTask.java b/redisson/src/main/java/org/redisson/mapreduce/ReducerTask.java index f0f573bde..ff7ed4ffb 100644 --- a/redisson/src/main/java/org/redisson/mapreduce/ReducerTask.java +++ b/redisson/src/main/java/org/redisson/mapreduce/ReducerTask.java @@ -17,6 +17,7 @@ package org.redisson.mapreduce; import java.io.Serializable; import java.util.List; +import java.util.concurrent.TimeUnit; import org.redisson.api.RListMultimap; import org.redisson.api.RMap; @@ -45,15 +46,17 @@ public class ReducerTask implements Runnable, Serializable { private RReducer reducer; private Class codecClass; private Codec codec; + private long timeout; public ReducerTask() { } - public ReducerTask(String name, RReducer reducer, Class codecClass, String resultMapName) { + public ReducerTask(String name, RReducer reducer, Class codecClass, String resultMapName, long timeout) { this.name = name; this.reducer = reducer; this.resultMapName = resultMapName; this.codecClass = codecClass; + this.timeout = timeout; } @Override @@ -76,6 +79,9 @@ public class ReducerTask implements Runnable, Serializable { VOut out = reducer.reduce(key, values.iterator()); map.put(key, out); } + if (timeout > 0) { + map.expire(timeout, TimeUnit.MILLISECONDS); + } multimap.delete(); } diff --git a/redisson/src/test/java/org/redisson/RedissonMapReduceTest.java b/redisson/src/test/java/org/redisson/RedissonMapReduceTest.java index 2c25dd4c8..80da71518 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapReduceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapReduceTest.java @@ -7,7 +7,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -18,7 +17,6 @@ import org.redisson.api.RExecutorService; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.api.RMapCache; -import org.redisson.api.RScheduledExecutorService; import org.redisson.api.RedissonClient; import org.redisson.api.annotation.RInject; import org.redisson.api.mapreduce.RCollator; @@ -26,9 +24,7 @@ import org.redisson.api.mapreduce.RCollector; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.api.mapreduce.RMapper; import org.redisson.api.mapreduce.RReducer; -import org.redisson.executor.ScheduledRunnableTask; - -import net.bytebuddy.utility.RandomString; +import org.redisson.mapreduce.MapReduceTimeoutException; @RunWith(Parameterized.class) public class RedissonMapReduceTest extends BaseTest { @@ -99,9 +95,22 @@ public class RedissonMapReduceTest extends BaseTest { RFuture> future = mapReduce.executeAsync(); Thread.sleep(100); future.cancel(true); - Thread.sleep(3000); } + @Test(expected = MapReduceTimeoutException.class) + public void testTimeout() { + RMap map = getMap(); + for (int i = 0; i < 100000; i++) { + map.put("" + i, "ab cd fjks"); + } + + RMapReduce mapReduce = map.mapReduce() + .mapper(new WordMapper()) + .reducer(new WordReducer()) + .timeout(1, TimeUnit.SECONDS); + + mapReduce.execute(); + } @Test public void test() {