From 110412297925415549cca50cfc0be78c30360171 Mon Sep 17 00:00:00 2001 From: "yihua.huang" Date: Sun, 27 Apr 2014 09:30:01 +0800 Subject: [PATCH] more abstraction in scheduler --- .../scheduler/DuplicatedRemoveScheduler.java | 45 ++++++++++++++++++ .../LocalDuplicatedRemoveScheduler.java | 34 ++++++++++++++ .../LocalDuplicatedRemovedScheduler.java | 47 ------------------- .../webmagic/scheduler/PriorityScheduler.java | 2 +- .../webmagic/scheduler/QueueScheduler.java | 2 +- .../scheduler/FileCacheQueueScheduler.java | 2 +- .../webmagic/scheduler/RedisScheduler.java | 44 ++++++++++++----- 7 files changed, 113 insertions(+), 63 deletions(-) create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicatedRemoveScheduler.java create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemoveScheduler.java delete mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemovedScheduler.java diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicatedRemoveScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicatedRemoveScheduler.java new file mode 100644 index 00000000..7b319b6e --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/DuplicatedRemoveScheduler.java @@ -0,0 +1,45 @@ +package us.codecraft.webmagic.scheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; + +/** + * Remove duplicate urls and only push urls which are not duplicate.

+ * + * @author code4crafer@gmail.com + * @since 0.5.0 + */ +public abstract class DuplicatedRemoveScheduler implements Scheduler { + + protected Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public void push(Request request, Task task) { + logger.trace("get a candidate url {}", request.getUrl()); + if (isDuplicate(request, task) || shouldReserved(request)) { + logger.debug("push to queue {}", request.getUrl()); + pushWhenNoDuplicate(request, task); + } + } + + /** + * Reset duplicate check. + */ + public abstract void resetDuplicateCheck(Task task); + + /** + * @param request + * @return + */ + protected abstract boolean isDuplicate(Request request, Task task); + + protected boolean shouldReserved(Request request) { + return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; + } + + protected void pushWhenNoDuplicate(Request request, Task task) { + + } +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemoveScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemoveScheduler.java new file mode 100644 index 00000000..c127c98b --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemoveScheduler.java @@ -0,0 +1,34 @@ +package us.codecraft.webmagic.scheduler; + +import com.google.common.collect.Sets; +import us.codecraft.webmagic.Request; +import us.codecraft.webmagic.Task; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Base Scheduler with duplicated urls removed by hash set.

+ * + * @author code4crafter@gmail.com + * @since 0.5.0 + */ +public abstract class LocalDuplicatedRemoveScheduler extends DuplicatedRemoveScheduler implements MonitorableScheduler { + + private Set urls = Sets.newSetFromMap(new ConcurrentHashMap()); + + @Override + public void resetDuplicateCheck(Task task) { + urls.clear(); + } + + @Override + protected boolean isDuplicate(Request request, Task task) { + return urls.add(request.getUrl()); + } + + @Override + public int getTotalRequestsCount(Task task) { + return urls.size(); + } +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemovedScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemovedScheduler.java deleted file mode 100644 index 1ec128b7..00000000 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/LocalDuplicatedRemovedScheduler.java +++ /dev/null @@ -1,47 +0,0 @@ -package us.codecraft.webmagic.scheduler; - -import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import us.codecraft.webmagic.Request; -import us.codecraft.webmagic.Task; - -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Base Scheduler with duplicated urls removed locally. - * - * @author code4crafter@gmail.com - * @since 0.5.0 - */ -public abstract class LocalDuplicatedRemovedScheduler implements MonitorableScheduler { - - protected Logger logger = LoggerFactory.getLogger(getClass()); - - private Set urls = Sets.newSetFromMap(new ConcurrentHashMap()); - - @Override - public void push(Request request, Task task) { - logger.trace("get a candidate url {}", request.getUrl()); - if (isDuplicate(request) || shouldReserved(request)) { - logger.debug("push to queue {}", request.getUrl()); - pushWhenNoDuplicate(request, task); - } - } - - protected boolean isDuplicate(Request request) { - return urls.add(request.getUrl()); - } - - protected boolean shouldReserved(Request request) { - return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; - } - - @Override - public int getTotalRequestsCount(Task task) { - return urls.size(); - } - - protected abstract void pushWhenNoDuplicate(Request request, Task task); -} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java index a57a6fbc..38c9b6cc 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/PriorityScheduler.java @@ -17,7 +17,7 @@ import java.util.concurrent.PriorityBlockingQueue; * @since 0.2.1 */ @ThreadSafe -public class PriorityScheduler extends LocalDuplicatedRemovedScheduler { +public class PriorityScheduler extends LocalDuplicatedRemoveScheduler { public static final int INITIAL_CAPACITY = 5; diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java index e2a6e75b..511d8a0a 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/scheduler/QueueScheduler.java @@ -16,7 +16,7 @@ import java.util.concurrent.LinkedBlockingQueue; * @since 0.1.0 */ @ThreadSafe -public class QueueScheduler extends LocalDuplicatedRemovedScheduler { +public class QueueScheduler extends LocalDuplicatedRemoveScheduler { private BlockingQueue queue = new LinkedBlockingQueue(); diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java index 9d7668d8..4215ab83 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @author code4crafter@gmail.com
* @since 0.2.0 */ -public class FileCacheQueueScheduler extends LocalDuplicatedRemovedScheduler { +public class FileCacheQueueScheduler extends LocalDuplicatedRemoveScheduler { private Logger logger = LoggerFactory.getLogger(getClass()); diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java index cd3a0b65..dc2ee2ee 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java @@ -14,7 +14,7 @@ import us.codecraft.webmagic.Task; * @author code4crafter@gmail.com
* @since 0.2.0 */ -public class RedisScheduler implements MonitorableScheduler { +public class RedisScheduler extends DuplicatedRemoveScheduler implements MonitorableScheduler { private JedisPool pool; @@ -33,21 +33,39 @@ public class RedisScheduler implements MonitorableScheduler { } @Override - public synchronized void push(Request request, Task task) { + public void resetDuplicateCheck(Task task) { Jedis jedis = pool.getResource(); try { - // if cycleRetriedTimes is set, allow duplicated. - Object cycleRetriedTimes = request.getExtra(Request.CYCLE_TRIED_TIMES); - // use set to remove duplicate url - if (cycleRetriedTimes != null || !jedis.sismember(getSetKey(task), request.getUrl())) { - // use list to store queue - jedis.rpush(getQueueKey(task), request.getUrl()); + jedis.del(getSetKey(task)); + } finally { + pool.returnResource(jedis); + } + } + + @Override + protected boolean isDuplicate(Request request, Task task) { + Jedis jedis = pool.getResource(); + try { + boolean isDuplicate = !jedis.sismember(getSetKey(task), request.getUrl()); + if (!isDuplicate) { jedis.sadd(getSetKey(task), request.getUrl()); - if (request.getExtras() != null) { - String field = DigestUtils.shaHex(request.getUrl()); - String value = JSON.toJSONString(request); - jedis.hset((ITEM_PREFIX + task.getUUID()), field, value); - } + } + return isDuplicate; + } finally { + pool.returnResource(jedis); + } + + } + + @Override + protected void pushWhenNoDuplicate(Request request, Task task) { + Jedis jedis = pool.getResource(); + try { + jedis.rpush(getQueueKey(task), request.getUrl()); + if (request.getExtras() != null) { + String field = DigestUtils.shaHex(request.getUrl()); + String value = JSON.toJSONString(request); + jedis.hset((ITEM_PREFIX + task.getUUID()), field, value); } } finally { pool.returnResource(jedis);