more abstraction in scheduler

pull/121/head
yihua.huang 11 years ago
parent b0fb1c3e10
commit 1104122979

@ -0,0 +1,45 @@
package us.codecraft.webmagic.scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
/**
* Remove duplicate urls and only push urls which are not duplicate.<br></br>
*
* @author code4crafer@gmail.com
* @since 0.5.0
*/
public abstract class DuplicatedRemoveScheduler implements Scheduler {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void push(Request request, Task task) {
logger.trace("get a candidate url {}", request.getUrl());
if (isDuplicate(request, task) || shouldReserved(request)) {
logger.debug("push to queue {}", request.getUrl());
pushWhenNoDuplicate(request, task);
}
}
/**
* Reset duplicate check.
*/
public abstract void resetDuplicateCheck(Task task);
/**
* @param request
* @return
*/
protected abstract boolean isDuplicate(Request request, Task task);
protected boolean shouldReserved(Request request) {
return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
}
protected void pushWhenNoDuplicate(Request request, Task task) {
}
}

@ -0,0 +1,34 @@
package us.codecraft.webmagic.scheduler;
import com.google.common.collect.Sets;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Base Scheduler with duplicated urls removed by hash set.<br></br>
*
* @author code4crafter@gmail.com
* @since 0.5.0
*/
public abstract class LocalDuplicatedRemoveScheduler extends DuplicatedRemoveScheduler implements MonitorableScheduler {
private Set<String> urls = Sets.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@Override
public void resetDuplicateCheck(Task task) {
urls.clear();
}
@Override
protected boolean isDuplicate(Request request, Task task) {
return urls.add(request.getUrl());
}
@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
}

@ -1,47 +0,0 @@
package us.codecraft.webmagic.scheduler;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Base Scheduler with duplicated urls removed locally.
*
* @author code4crafter@gmail.com
* @since 0.5.0
*/
public abstract class LocalDuplicatedRemovedScheduler implements MonitorableScheduler {
protected Logger logger = LoggerFactory.getLogger(getClass());
private Set<String> urls = Sets.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@Override
public void push(Request request, Task task) {
logger.trace("get a candidate url {}", request.getUrl());
if (isDuplicate(request) || shouldReserved(request)) {
logger.debug("push to queue {}", request.getUrl());
pushWhenNoDuplicate(request, task);
}
}
protected boolean isDuplicate(Request request) {
return urls.add(request.getUrl());
}
protected boolean shouldReserved(Request request) {
return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
}
@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
protected abstract void pushWhenNoDuplicate(Request request, Task task);
}

@ -17,7 +17,7 @@ import java.util.concurrent.PriorityBlockingQueue;
* @since 0.2.1
*/
@ThreadSafe
public class PriorityScheduler extends LocalDuplicatedRemovedScheduler {
public class PriorityScheduler extends LocalDuplicatedRemoveScheduler {
public static final int INITIAL_CAPACITY = 5;

@ -16,7 +16,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* @since 0.1.0
*/
@ThreadSafe
public class QueueScheduler extends LocalDuplicatedRemovedScheduler {
public class QueueScheduler extends LocalDuplicatedRemoveScheduler {
private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();

@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author code4crafter@gmail.com <br>
* @since 0.2.0
*/
public class FileCacheQueueScheduler extends LocalDuplicatedRemovedScheduler {
public class FileCacheQueueScheduler extends LocalDuplicatedRemoveScheduler {
private Logger logger = LoggerFactory.getLogger(getClass());

@ -14,7 +14,7 @@ import us.codecraft.webmagic.Task;
* @author code4crafter@gmail.com <br>
* @since 0.2.0
*/
public class RedisScheduler implements MonitorableScheduler {
public class RedisScheduler extends DuplicatedRemoveScheduler implements MonitorableScheduler {
private JedisPool pool;
@ -33,22 +33,40 @@ public class RedisScheduler implements MonitorableScheduler {
}
@Override
public synchronized void push(Request request, Task task) {
public void resetDuplicateCheck(Task task) {
Jedis jedis = pool.getResource();
try {
// if cycleRetriedTimes is set, allow duplicated.
Object cycleRetriedTimes = request.getExtra(Request.CYCLE_TRIED_TIMES);
// use set to remove duplicate url
if (cycleRetriedTimes != null || !jedis.sismember(getSetKey(task), request.getUrl())) {
// use list to store queue
jedis.rpush(getQueueKey(task), request.getUrl());
jedis.del(getSetKey(task));
} finally {
pool.returnResource(jedis);
}
}
@Override
protected boolean isDuplicate(Request request, Task task) {
Jedis jedis = pool.getResource();
try {
boolean isDuplicate = !jedis.sismember(getSetKey(task), request.getUrl());
if (!isDuplicate) {
jedis.sadd(getSetKey(task), request.getUrl());
}
return isDuplicate;
} finally {
pool.returnResource(jedis);
}
}
@Override
protected void pushWhenNoDuplicate(Request request, Task task) {
Jedis jedis = pool.getResource();
try {
jedis.rpush(getQueueKey(task), request.getUrl());
if (request.getExtras() != null) {
String field = DigestUtils.shaHex(request.getUrl());
String value = JSON.toJSONString(request);
jedis.hset((ITEM_PREFIX + task.getUUID()), field, value);
}
}
} finally {
pool.returnResource(jedis);
}

Loading…
Cancel
Save