refactoring

pull/841/head
Nikita 8 years ago
parent 5d4d66c83e
commit 242c73897f

@ -85,7 +85,8 @@ import java.util.concurrent.TimeUnit;
public interface RMapReduce<KIn, VIn, KOut, VOut> extends RMapReduceExecutor<VIn, KOut, VOut> {
/**
* Defines timeout for MapReduce process
* Defines timeout for MapReduce process.
* <code>0</code> means infinity timeout.
*
* @param timeout
* @param unit

@ -16,6 +16,8 @@
package org.redisson.mapreduce;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.RInject;
@ -35,7 +37,7 @@ public abstract class BaseMapperTask<KOut, VOut> implements Runnable, Serializab
protected RedissonClient redisson;
protected Class<?> objectClass;
protected String objectName;
protected List<String> objectNames = new ArrayList<String>();
protected Class<?> objectCodecClass;
protected int workersAmount;
@ -45,13 +47,20 @@ public abstract class BaseMapperTask<KOut, VOut> implements Runnable, Serializab
public BaseMapperTask() {
}
public BaseMapperTask(Class<?> objectClass, String objectName, Class<?> objectCodecClass) {
public BaseMapperTask(Class<?> objectClass, Class<?> objectCodecClass) {
super();
this.objectClass = objectClass;
this.objectName = objectName;
this.objectCodecClass = objectCodecClass;
}
public void addObjectName(String objectName) {
this.objectNames.add(objectName);
}
public void clearObjectNames() {
this.objectNames.clear();
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
@ -63,5 +72,5 @@ public abstract class BaseMapperTask<KOut, VOut> implements Runnable, Serializab
public void setCollectorMapName(String collatorMapName) {
this.collectorMapName = collatorMapName;
}
}

@ -43,8 +43,8 @@ public class CollectionMapperTask<VIn, KOut, VOut> extends BaseMapperTask<KOut,
public CollectionMapperTask() {
}
public CollectionMapperTask(RCollectionMapper<VIn, KOut, VOut> mapper, Class<?> objectClass, String objectName, Class<?> objectCodecClass) {
super(objectClass, objectName, objectCodecClass);
public CollectionMapperTask(RCollectionMapper<VIn, KOut, VOut> mapper, Class<?> objectClass, Class<?> objectCodecClass) {
super(objectClass, objectCodecClass);
this.mapper = mapper;
}
@ -59,31 +59,33 @@ public class CollectionMapperTask<VIn, KOut, VOut> extends BaseMapperTask<KOut,
Injector.inject(mapper, redisson);
Iterable<VIn> collection = null;
if (RSetCache.class.isAssignableFrom(objectClass)) {
collection = redisson.getSetCache(objectName, codec);
} else if (RSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSet(objectName, codec);
} else if (RSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSortedSet(objectName, codec);
} else if (RScoredSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getScoredSortedSet(objectName, codec);
} else if (RLexSortedSet.class.isAssignableFrom(objectClass)) {
collection = (Iterable<VIn>) redisson.getLexSortedSet(objectName);
} else if (RList.class.isAssignableFrom(objectClass)) {
collection = redisson.getList(objectName, codec);
} else {
throw new IllegalStateException("Unable to work with " + objectClass);
}
RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount, timeout);
for (VIn value : collection) {
if (Thread.currentThread().isInterrupted()) {
return;
for (String objectName : objectNames) {
Iterable<VIn> collection = null;
if (RSetCache.class.isAssignableFrom(objectClass)) {
collection = redisson.getSetCache(objectName, codec);
} else if (RSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSet(objectName, codec);
} else if (RSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getSortedSet(objectName, codec);
} else if (RScoredSortedSet.class.isAssignableFrom(objectClass)) {
collection = redisson.getScoredSortedSet(objectName, codec);
} else if (RLexSortedSet.class.isAssignableFrom(objectClass)) {
collection = (Iterable<VIn>) redisson.getLexSortedSet(objectName);
} else if (RList.class.isAssignableFrom(objectClass)) {
collection = redisson.getList(objectName, codec);
} else {
throw new IllegalStateException("Unable to work with " + objectClass);
}
RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount, timeout);
for (VIn value : collection) {
if (Thread.currentThread().isInterrupted()) {
return;
}
mapper.map(value, collector);
}
mapper.map(value, collector);
}
}

@ -16,11 +16,8 @@
package org.redisson.mapreduce;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -34,9 +31,6 @@ import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RReducer;
import org.redisson.client.codec.Codec;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
@ -46,25 +40,6 @@ import io.netty.util.concurrent.FutureListener;
*/
public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializable {
public static class LatchListener implements FutureListener<Object> {
private CountDownLatch latch;
public LatchListener() {
}
public LatchListener(CountDownLatch latch) {
super();
this.latch = latch;
}
@Override
public void operationComplete(Future<Object> future) throws Exception {
latch.countDown();
}
}
private static final long serialVersionUID = 7559371478909848610L;
@RInject
@ -124,59 +99,31 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
if (timeout > 0) {
mapperTask.setTimeout(timeout - timeSpent);
}
RFuture<?> mapperFuture = executor.submit(mapperTask);
if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) {
mapperFuture.cancel(true);
throw new MapReduceTimeoutException();
}
if (timeout == 0) {
try {
mapperFuture.await();
} catch (InterruptedException e) {
return null;
}
}
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
final CountDownLatch latch = new CountDownLatch(workersAmount);
for (int i = 0; i < workersAmount; i++) {
String name = collectorMapName + ":" + i;
Runnable runnable = new ReducerTask<KOut, VOut>(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent);
RFuture<?> future = executor.submitAsync(runnable);
future.addListener(new LatchListener(latch));
futures.add(future);
}
if (Thread.currentThread().isInterrupted()) {
cancelReduce(futures);
return null;
}
timeSpent = System.currentTimeMillis() - startTime;
if (isTimeoutExpired(timeSpent)) {
cancelReduce(futures);
throw new MapReduceTimeoutException();
}
mapperTask.addObjectName(objectName);
RFuture<?> mapperFuture = executor.submitAsync(mapperTask);
try {
if (timeout > 0 && !latch.await(timeout - timeSpent, TimeUnit.MILLISECONDS)) {
cancelReduce(futures);
if (timeout > 0 && !mapperFuture.await(timeout - timeSpent)) {
mapperFuture.cancel(true);
throw new MapReduceTimeoutException();
}
if (timeout == 0) {
try {
latch.await();
} catch (InterruptedException e) {
return null;
}
mapperFuture.await();
}
} catch (InterruptedException e) {
cancelReduce(futures);
mapperFuture.cancel(true);
return null;
}
for (RFuture<?> rFuture : futures) {
if (!rFuture.isSuccess()) {
throw (Exception) rFuture.cause();
}
SubTasksExecutor reduceExecutor = new SubTasksExecutor(executor, workersAmount, startTime, timeout);
for (int i = 0; i < workersAmount; i++) {
String name = collectorMapName + ":" + i;
Runnable runnable = new ReducerTask<KOut, VOut>(name, reducer, objectCodecClass, resultMapName, timeout - timeSpent);
reduceExecutor.submit(runnable);
}
if (!reduceExecutor.await()) {
return null;
}
return executeCollator();
@ -215,10 +162,4 @@ public class CoordinatorTask<KOut, VOut> implements Callable<Object>, Serializab
return timeSpent > timeout && timeout > 0;
}
private void cancelReduce(List<RFuture<?>> futures) {
for (RFuture<?> future : futures) {
future.cancel(true);
}
}
}

@ -42,8 +42,8 @@ public class MapperTask<KIn, VIn, KOut, VOut> extends BaseMapperTask<KOut, VOut>
public MapperTask() {
}
public MapperTask(RMapper<KIn, VIn, KOut, VOut> mapper, Class<?> objectClass, String objectName, Class<?> objectCodecClass) {
super(objectClass, objectName, objectCodecClass);
public MapperTask(RMapper<KIn, VIn, KOut, VOut> mapper, Class<?> objectClass, Class<?> objectCodecClass) {
super(objectClass, objectCodecClass);
this.mapper = mapper;
}
@ -57,22 +57,23 @@ public class MapperTask<KIn, VIn, KOut, VOut> extends BaseMapperTask<KOut, VOut>
}
Injector.inject(mapper, redisson);
RMap<KIn, VIn> map = null;
if (RMapCache.class.isAssignableFrom(objectClass)) {
map = redisson.getMapCache(objectName, codec);
} else {
map = redisson.getMap(objectName, codec);
}
RCollector<KOut, VOut> collector = new Collector<KOut, VOut>(codec, redisson, collectorMapName, workersAmount, timeout);
for (Entry<KIn, VIn> entry : map.entrySet()) {
if (Thread.currentThread().isInterrupted()) {
return;
for (String objectName : objectNames) {
RMap<KIn, VIn> map = null;
if (RMapCache.class.isAssignableFrom(objectClass)) {
map = redisson.getMapCache(objectName, codec);
} else {
map = redisson.getMap(objectName, codec);
}
mapper.map(entry.getKey(), entry.getValue(), collector);
for (Entry<KIn, VIn> entry : map.entrySet()) {
if (Thread.currentThread().isInterrupted()) {
return;
}
mapper.map(entry.getKey(), entry.getValue(), collector);
}
}
}

@ -63,7 +63,7 @@ public class RedissonCollectionMapReduce<VIn, KOut, VOut> extends MapReduceExecu
@Override
protected Callable<Object> createTask(String resultMapName, RCollator<KOut, VOut, Object> collator) {
CollectionMapperTask<VIn, KOut, VOut> mapperTask = new CollectionMapperTask<VIn, KOut, VOut>(mapper, objectClass, objectName, objectCodec.getClass());
CollectionMapperTask<VIn, KOut, VOut> mapperTask = new CollectionMapperTask<VIn, KOut, VOut>(mapper, objectClass, objectCodec.getClass());
return new CoordinatorTask<KOut, VOut>(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis());
}

@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollator;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.api.mapreduce.RMapper;
import org.redisson.api.mapreduce.RReducer;
@ -65,7 +64,7 @@ public class RedissonMapReduce<KIn, VIn, KOut, VOut> extends MapReduceExecutor<R
@Override
protected Callable<Object> createTask(String resultMapName, RCollator<KOut, VOut, Object> collator) {
MapperTask<KIn, VIn, KOut, VOut> mapperTask = new MapperTask<KIn, VIn, KOut, VOut>(mapper, objectClass, objectName, objectCodec.getClass());
MapperTask<KIn, VIn, KOut, VOut> mapperTask = new MapperTask<KIn, VIn, KOut, VOut>(mapper, objectClass, objectCodec.getClass());
return new CoordinatorTask<KOut, VOut>(mapperTask, reducer, objectName, resultMapName, objectCodec.getClass(), objectClass, collator, timeout, System.currentTimeMillis());
}

@ -0,0 +1,115 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.mapreduce;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RExecutorService;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class SubTasksExecutor {
public static class LatchListener implements FutureListener<Object> {
private CountDownLatch latch;
public LatchListener() {
}
public LatchListener(CountDownLatch latch) {
super();
this.latch = latch;
}
@Override
public void operationComplete(Future<Object> future) throws Exception {
latch.countDown();
}
}
private final List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
private final CountDownLatch latch;
private final RExecutorService executor;
private final long startTime;
private final long timeout;
public SubTasksExecutor(RExecutorService executor, int workersAmount, long startTime, long timeout) {
this.executor = executor;
this.latch = new CountDownLatch(workersAmount);
this.startTime = startTime;
this.timeout = timeout;
}
public void submit(Runnable runnable) {
RFuture<?> future = executor.submitAsync(runnable);
future.addListener(new LatchListener(latch));
futures.add(future);
}
private void cancel(List<RFuture<?>> futures) {
for (RFuture<?> future : futures) {
future.cancel(true);
}
}
private boolean isTimeoutExpired(long timeSpent) {
return timeSpent > timeout && timeout > 0;
}
public boolean await() throws Exception {
if (Thread.currentThread().isInterrupted()) {
cancel(futures);
return false;
}
long timeSpent = System.currentTimeMillis() - startTime;
if (isTimeoutExpired(timeSpent)) {
cancel(futures);
throw new MapReduceTimeoutException();
}
try {
if (timeout > 0 && !latch.await(timeout - timeSpent, TimeUnit.MILLISECONDS)) {
cancel(futures);
throw new MapReduceTimeoutException();
}
if (timeout == 0) {
latch.await();
}
} catch (InterruptedException e) {
cancel(futures);
return false;
}
for (RFuture<?> rFuture : futures) {
if (!rFuture.isSuccess()) {
throw (Exception) rFuture.cause();
}
}
return true;
}
}
Loading…
Cancel
Save